use std::time::Instant;
use std::fmt;
use rand::random;
use serde::{Serialize, Deserialize};
#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct NodeID(u64);
#[derive(Copy, Clone, Debug, PartialEq)]
pub(crate) struct NCNodeInfo {
pub(crate) node_id: NodeID,
pub(crate) instant: Instant,
}
pub(crate) struct NCNodeList {
nodes: Vec<NCNodeInfo>,
}
impl NodeID {
pub(crate) fn unset() -> Self {
NodeID(0)
}
pub(crate) fn random() -> Self {
NodeID(random())
}
}
impl fmt::Display for NodeID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl NCNodeInfo {
pub(crate) fn new(node_id: NodeID) -> Self {
NCNodeInfo{ node_id, instant: Instant::now() }
}
pub(crate) fn update_heartbeat(&mut self) {
self.instant = Instant::now();
}
pub(crate) fn heartbeat_invalid(&self, limit: u64) -> bool {
let diff = Instant::now() - self.instant;
diff.as_secs() > limit
}
}
impl NCNodeList {
pub(crate) fn new() -> Self {
NCNodeList { nodes: Vec::new() }
}
pub(crate) fn check_heartbeat(&self, heartbeat_duration: u64) -> impl Iterator<Item=NodeID> + '_ {
self.nodes.iter().filter(move |node| node.heartbeat_invalid(heartbeat_duration)).map(|node| node.node_id)
}
pub(crate) fn register_new_node(&mut self) -> NodeID {
let mut new_id: NodeID = NodeID::random();
'l1: loop {
for node_info in self.nodes.iter() {
if node_info.node_id == new_id {
new_id = NodeID::random();
continue 'l1
}
}
break
}
self.nodes.push(NCNodeInfo::new(new_id));
new_id
}
pub(crate) fn update_heartbeat(&mut self, node_id: NodeID) {
for node in self.nodes.iter_mut() {
if node.node_id == node_id {
node.update_heartbeat();
break
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_heartbeat_invalid() {
let node_info = NCNodeInfo::new(NodeID::unset());
thread::sleep(Duration::from_secs(3));
assert!(!node_info.heartbeat_invalid(5));
thread::sleep(Duration::from_secs(3));
assert!(node_info.heartbeat_invalid(5));
}
#[test]
fn test_update_heartbeat() {
let mut node_info = NCNodeInfo::new(NodeID::unset());
thread::sleep(Duration::from_secs(5));
assert!(node_info.heartbeat_invalid(3));
node_info.update_heartbeat();
assert!(!node_info.heartbeat_invalid(3));
}
#[test]
fn test_register_new_node() {
let mut node_list = NCNodeList::new();
let node = node_list.register_new_node();
assert_eq!(node_list.nodes.len(), 1);
assert_eq!(node_list.nodes[0].node_id, node);
let node = node_list.register_new_node();
assert_eq!(node_list.nodes.len(), 2);
assert_ne!(node_list.nodes[0].node_id, node);
assert_eq!(node_list.nodes[1].node_id, node);
}
#[test]
fn test_node_list_check_heartbeat() {
let mut node_list = NCNodeList::new();
let _ = node_list.register_new_node();
let _ = node_list.register_new_node();
let _ = node_list.register_new_node();
let _ = node_list.register_new_node();
let result = node_list.check_heartbeat(5);
let result = result.collect::<Vec<NodeID>>();
assert_eq!(result.len(), 0);
thread::sleep(Duration::from_secs(5));
let result = node_list.check_heartbeat(3);
let result = result.collect::<Vec<NodeID>>();
assert_eq!(result.len(), 4);
}
#[test]
fn test_node_list_update_heartbeat() {
let mut node_list = NCNodeList::new();
let _ = node_list.register_new_node();
let _ = node_list.register_new_node();
let node_id = node_list.register_new_node();
let _ = node_list.register_new_node();
thread::sleep(Duration::from_secs(5));
node_list.update_heartbeat(node_id);
let result = node_list.check_heartbeat(3);
let result = result.collect::<Vec<NodeID>>();
assert_eq!(result.len(), 3);
for other_id in result {
assert_ne!(other_id, node_id);
}
}
}