coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
use crate::remote::system::NodeId;

use hashring::HashRing;

use crate::remote::config::SystemCapabilities;
use crate::remote::net::message::{datetime_to_timestamp, timestamp_to_datetime};
use crate::remote::net::proto::network;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;

pub struct RemoteNodeStore {
    nodes: HashMap<NodeId, RemoteNodeState>,
    table: HashRing<RemoteNode>,
}

#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
pub enum NodeStatus {
    Joining,
    Healthy,
    Unhealthy,
    Terminated,
}

impl NodeStatus {
    pub fn is_healthy(&self) -> bool {
        return matches!(&self, Self::Healthy);
    }
}

pub type NodeAttributes = HashMap<Arc<str>, Arc<str>>;

pub type NodeAttributesRef = Arc<NodeAttributes>;

#[derive(Debug, Clone)]
pub struct RemoteNodeState {
    pub id: NodeId,
    pub addr: String,
    pub tag: String,
    pub ping_latency: Option<Duration>,
    pub last_heartbeat: Option<DateTime<Utc>>,
    pub node_started_at: Option<DateTime<Utc>>,
    pub status: NodeStatus,
    pub attributes: NodeAttributesRef,
}

#[derive(Debug, Clone)]
pub struct RemoteNode {
    pub id: NodeId,
    pub addr: String,
    pub tag: String,
    pub node_started_at: Option<DateTime<Utc>>,
    pub attributes: NodeAttributesRef,
}

impl Hash for RemoteNode {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.id.hash(state);
        self.addr.hash(state);
        self.tag.hash(state);
        self.node_started_at.hash(state);
    }
}

#[derive(Clone)]
pub struct NodeIdentity {
    pub node: RemoteNode,
    pub peers: Vec<RemoteNode>,
    pub capabilities: SystemCapabilities,
}

impl RemoteNodeStore {
    pub fn new(nodes: Vec<RemoteNode>) -> RemoteNodeStore {
        let mut table = HashRing::new();

        let nodes = nodes
            .into_iter()
            .map(|n| {
                table.add(n.clone());
                (n.id, RemoteNodeState::new(n))
            })
            .collect();

        RemoteNodeStore { table, nodes }
    }

    pub fn update_nodes(&mut self, nodes: Vec<RemoteNodeState>) {
        for node in nodes {
            self.nodes.insert(node.id, node);
        }
    }

    pub fn node_terminated(&mut self, node_id: NodeId) {
        let node = self.get_mut(&node_id);
        if let Some(node) = node {
            node.status = NodeStatus::Terminated;
        }
    }

    pub fn get(&self, node_id: &NodeId) -> Option<&RemoteNodeState> {
        self.nodes.get(node_id)
    }

    pub fn get_mut(&mut self, node_id: &NodeId) -> Option<&mut RemoteNodeState> {
        self.nodes.get_mut(node_id)
    }

    pub fn is_registered(&self, node_id: NodeId) -> bool {
        self.nodes.contains_key(&node_id)
    }

    pub fn remove(&mut self, node_id: &NodeId) -> Option<RemoteNode> {
        self.nodes.remove(&node_id).and_then(|node| {
            let node = node.into();
            self.table.remove(&node)
        })
    }

    pub fn get_by_key(&mut self, key: impl Hash) -> Option<&RemoteNode> {
        self.table.get(&key)
    }

    pub fn add(&mut self, node: RemoteNode) {
        let mut nodes = self.get_all();
        nodes.push(RemoteNodeState::new(node));
        nodes.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());

        self.table = HashRing::new();
        self.nodes = nodes
            .into_iter()
            .map(|n| {
                let node = n.clone();
                self.table.add(n.into());
                (node.id, node)
            })
            .collect();
    }

    pub fn get_all(&self) -> Vec<RemoteNodeState> {
        self.nodes.values().cloned().collect()
    }
}

impl RemoteNodeState {
    pub fn new(node: RemoteNode) -> Self {
        let id = node.id;
        let addr = node.addr;
        let node_started_at = node.node_started_at;
        let tag = node.tag;

        Self {
            id,
            addr,
            node_started_at,
            tag,
            ping_latency: None,
            last_heartbeat: None,
            status: NodeStatus::Joining,
            attributes: node.attributes.clone(),
        }
    }
}

impl From<RemoteNodeState> for RemoteNode {
    fn from(s: RemoteNodeState) -> Self {
        Self {
            id: s.id,
            addr: s.addr,
            tag: s.tag,
            node_started_at: s.node_started_at,
            attributes: s.attributes.clone(),
        }
    }
}

impl From<network::RemoteNode> for RemoteNode {
    fn from(n: network::RemoteNode) -> Self {
        Self {
            id: n.node_id,
            addr: n.addr,
            tag: n.tag,
            node_started_at: n.node_started_at.into_option().map(timestamp_to_datetime),
            attributes: n
                .attributes
                .into_iter()
                .map(|(k, v)| (k.into(), v.into()))
                .collect::<NodeAttributes>()
                .into(),
        }
    }
}

impl From<RemoteNode> for network::RemoteNode {
    fn from(n: RemoteNode) -> Self {
        Self {
            node_id: n.id,
            addr: n.addr,
            tag: n.tag,
            node_started_at: n.node_started_at.as_ref().map(datetime_to_timestamp).into(),
            attributes: n
                .attributes
                .iter()
                .map(|(k, v)| (k.to_string(), v.to_string()))
                .collect(),
            ..Self::default()
        }
    }
}

impl From<&RemoteNode> for network::RemoteNode {
    fn from(n: &RemoteNode) -> Self {
        Self {
            node_id: n.id,
            addr: n.addr.clone(),
            tag: n.tag.clone(),
            node_started_at: n.node_started_at.as_ref().map(datetime_to_timestamp).into(),
            attributes: n
                .attributes
                .iter()
                .map(|(k, v)| (k.to_string(), v.to_string()))
                .collect(),
            ..Self::default()
        }
    }
}

impl From<RemoteNodeState> for network::RemoteNode {
    fn from(s: RemoteNodeState) -> Self {
        Self {
            node_id: s.id,
            addr: s.addr.clone(),
            tag: s.tag.clone(),
            node_started_at: s.node_started_at.as_ref().map(datetime_to_timestamp).into(),
            attributes: s
                .attributes
                .iter()
                .map(|(k, v)| (k.to_string(), v.to_string()))
                .collect(),
            ..Self::default()
        }
    }
}

impl From<&network::NodeIdentity> for RemoteNode {
    fn from(n: &network::NodeIdentity) -> Self {
        RemoteNode {
            id: n.node_id,
            addr: n.addr.clone(),
            tag: n.node_tag.clone(),
            node_started_at: n
                .node_started_at
                .clone()
                .into_option()
                .map(timestamp_to_datetime),
            attributes: n
                .attributes
                .iter()
                .map(|(k, v)| (k.clone().into(), v.clone().into()))
                .collect::<NodeAttributes>()
                .into(),
        }
    }
}

impl Default for RemoteNodeState {
    fn default() -> Self {
        RemoteNodeState {
            id: NodeId::default(),
            addr: String::default(),
            tag: String::default(),
            status: NodeStatus::Joining,
            ping_latency: None,
            last_heartbeat: None,
            node_started_at: None,
            attributes: Arc::new(NodeAttributes::new()),
        }
    }
}

impl RemoteNode {
    pub fn new(
        id: u64,
        addr: String,
        tag: String,
        node_started_at: Option<DateTime<Utc>>,
        attributes: NodeAttributesRef,
    ) -> RemoteNode {
        RemoteNode {
            id,
            addr,
            tag,
            node_started_at,
            attributes,
        }
    }
}

impl ToString for RemoteNode {
    fn to_string(&self) -> String {
        format!("{}|{}", self.addr, self.id)
    }
}

impl PartialEq for RemoteNode {
    fn eq(&self, other: &RemoteNode) -> bool {
        self.id == other.id && self.addr == other.addr
    }
}

impl PartialEq for RemoteNodeState {
    fn eq(&self, other: &RemoteNodeState) -> bool {
        self.id == other.id && self.addr == other.addr
    }
}