Skip to main content

coerce/remote/cluster/
node.rs

1use crate::remote::system::NodeId;
2
3use hashring::HashRing;
4
5use crate::remote::config::SystemCapabilities;
6use crate::remote::net::message::{datetime_to_timestamp, timestamp_to_datetime};
7use crate::remote::net::proto::network;
8use chrono::{DateTime, Utc};
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11use std::sync::Arc;
12use std::time::Duration;
13
14pub struct RemoteNodeStore {
15    nodes: HashMap<NodeId, RemoteNodeState>,
16    table: HashRing<RemoteNode>,
17}
18
19#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
20pub enum NodeStatus {
21    Joining,
22    Healthy,
23    Unhealthy,
24    Terminated,
25}
26
27impl NodeStatus {
28    pub fn is_healthy(&self) -> bool {
29        return matches!(&self, Self::Healthy);
30    }
31}
32
33pub type NodeAttributes = HashMap<Arc<str>, Arc<str>>;
34
35pub type NodeAttributesRef = Arc<NodeAttributes>;
36
37#[derive(Debug, Clone)]
38pub struct RemoteNodeState {
39    pub id: NodeId,
40    pub addr: String,
41    pub tag: String,
42    pub ping_latency: Option<Duration>,
43    pub last_heartbeat: Option<DateTime<Utc>>,
44    pub node_started_at: Option<DateTime<Utc>>,
45    pub status: NodeStatus,
46    pub attributes: NodeAttributesRef,
47}
48
49#[derive(Debug, Clone)]
50pub struct RemoteNode {
51    pub id: NodeId,
52    pub addr: String,
53    pub tag: String,
54    pub node_started_at: Option<DateTime<Utc>>,
55    pub attributes: NodeAttributesRef,
56}
57
58impl Hash for RemoteNode {
59    fn hash<H: Hasher>(&self, state: &mut H) {
60        self.id.hash(state);
61        self.addr.hash(state);
62        self.tag.hash(state);
63        self.node_started_at.hash(state);
64    }
65}
66
67#[derive(Clone)]
68pub struct NodeIdentity {
69    pub node: RemoteNode,
70    pub peers: Vec<RemoteNode>,
71    pub capabilities: SystemCapabilities,
72}
73
74impl RemoteNodeStore {
75    pub fn new(nodes: Vec<RemoteNode>) -> RemoteNodeStore {
76        let mut table = HashRing::new();
77
78        let nodes = nodes
79            .into_iter()
80            .map(|n| {
81                table.add(n.clone());
82                (n.id, RemoteNodeState::new(n))
83            })
84            .collect();
85
86        RemoteNodeStore { table, nodes }
87    }
88
89    pub fn update_nodes(&mut self, nodes: Vec<RemoteNodeState>) {
90        for node in nodes {
91            self.nodes.insert(node.id, node);
92        }
93    }
94
95    pub fn node_terminated(&mut self, node_id: NodeId) {
96        let node = self.get_mut(&node_id);
97        if let Some(node) = node {
98            node.status = NodeStatus::Terminated;
99        }
100    }
101
102    pub fn get(&self, node_id: &NodeId) -> Option<&RemoteNodeState> {
103        self.nodes.get(node_id)
104    }
105
106    pub fn get_mut(&mut self, node_id: &NodeId) -> Option<&mut RemoteNodeState> {
107        self.nodes.get_mut(node_id)
108    }
109
110    pub fn is_registered(&self, node_id: NodeId) -> bool {
111        self.nodes.contains_key(&node_id)
112    }
113
114    pub fn remove(&mut self, node_id: &NodeId) -> Option<RemoteNode> {
115        self.nodes.remove(&node_id).and_then(|node| {
116            let node = node.into();
117            self.table.remove(&node)
118        })
119    }
120
121    pub fn get_by_key(&mut self, key: impl Hash) -> Option<&RemoteNode> {
122        self.table.get(&key)
123    }
124
125    pub fn add(&mut self, node: RemoteNode) {
126        let mut nodes = self.get_all();
127        nodes.push(RemoteNodeState::new(node));
128        nodes.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
129
130        self.table = HashRing::new();
131        self.nodes = nodes
132            .into_iter()
133            .map(|n| {
134                let node = n.clone();
135                self.table.add(n.into());
136                (node.id, node)
137            })
138            .collect();
139    }
140
141    pub fn get_all(&self) -> Vec<RemoteNodeState> {
142        self.nodes.values().cloned().collect()
143    }
144}
145
146impl RemoteNodeState {
147    pub fn new(node: RemoteNode) -> Self {
148        let id = node.id;
149        let addr = node.addr;
150        let node_started_at = node.node_started_at;
151        let tag = node.tag;
152
153        Self {
154            id,
155            addr,
156            node_started_at,
157            tag,
158            ping_latency: None,
159            last_heartbeat: None,
160            status: NodeStatus::Joining,
161            attributes: node.attributes.clone(),
162        }
163    }
164}
165
166impl From<RemoteNodeState> for RemoteNode {
167    fn from(s: RemoteNodeState) -> Self {
168        Self {
169            id: s.id,
170            addr: s.addr,
171            tag: s.tag,
172            node_started_at: s.node_started_at,
173            attributes: s.attributes.clone(),
174        }
175    }
176}
177
178impl From<network::RemoteNode> for RemoteNode {
179    fn from(n: network::RemoteNode) -> Self {
180        Self {
181            id: n.node_id,
182            addr: n.addr,
183            tag: n.tag,
184            node_started_at: n.node_started_at.into_option().map(timestamp_to_datetime),
185            attributes: n
186                .attributes
187                .into_iter()
188                .map(|(k, v)| (k.into(), v.into()))
189                .collect::<NodeAttributes>()
190                .into(),
191        }
192    }
193}
194
195impl From<RemoteNode> for network::RemoteNode {
196    fn from(n: RemoteNode) -> Self {
197        Self {
198            node_id: n.id,
199            addr: n.addr,
200            tag: n.tag,
201            node_started_at: n.node_started_at.as_ref().map(datetime_to_timestamp).into(),
202            attributes: n
203                .attributes
204                .iter()
205                .map(|(k, v)| (k.to_string(), v.to_string()))
206                .collect(),
207            ..Self::default()
208        }
209    }
210}
211
212impl From<&RemoteNode> for network::RemoteNode {
213    fn from(n: &RemoteNode) -> Self {
214        Self {
215            node_id: n.id,
216            addr: n.addr.clone(),
217            tag: n.tag.clone(),
218            node_started_at: n.node_started_at.as_ref().map(datetime_to_timestamp).into(),
219            attributes: n
220                .attributes
221                .iter()
222                .map(|(k, v)| (k.to_string(), v.to_string()))
223                .collect(),
224            ..Self::default()
225        }
226    }
227}
228
229impl From<RemoteNodeState> for network::RemoteNode {
230    fn from(s: RemoteNodeState) -> Self {
231        Self {
232            node_id: s.id,
233            addr: s.addr.clone(),
234            tag: s.tag.clone(),
235            node_started_at: s.node_started_at.as_ref().map(datetime_to_timestamp).into(),
236            attributes: s
237                .attributes
238                .iter()
239                .map(|(k, v)| (k.to_string(), v.to_string()))
240                .collect(),
241            ..Self::default()
242        }
243    }
244}
245
246impl From<&network::NodeIdentity> for RemoteNode {
247    fn from(n: &network::NodeIdentity) -> Self {
248        RemoteNode {
249            id: n.node_id,
250            addr: n.addr.clone(),
251            tag: n.node_tag.clone(),
252            node_started_at: n
253                .node_started_at
254                .clone()
255                .into_option()
256                .map(timestamp_to_datetime),
257            attributes: n
258                .attributes
259                .iter()
260                .map(|(k, v)| (k.clone().into(), v.clone().into()))
261                .collect::<NodeAttributes>()
262                .into(),
263        }
264    }
265}
266
267impl Default for RemoteNodeState {
268    fn default() -> Self {
269        RemoteNodeState {
270            id: NodeId::default(),
271            addr: String::default(),
272            tag: String::default(),
273            status: NodeStatus::Joining,
274            ping_latency: None,
275            last_heartbeat: None,
276            node_started_at: None,
277            attributes: Arc::new(NodeAttributes::new()),
278        }
279    }
280}
281
282impl RemoteNode {
283    pub fn new(
284        id: u64,
285        addr: String,
286        tag: String,
287        node_started_at: Option<DateTime<Utc>>,
288        attributes: NodeAttributesRef,
289    ) -> RemoteNode {
290        RemoteNode {
291            id,
292            addr,
293            tag,
294            node_started_at,
295            attributes,
296        }
297    }
298}
299
300impl ToString for RemoteNode {
301    fn to_string(&self) -> String {
302        format!("{}|{}", self.addr, self.id)
303    }
304}
305
306impl PartialEq for RemoteNode {
307    fn eq(&self, other: &RemoteNode) -> bool {
308        self.id == other.id && self.addr == other.addr
309    }
310}
311
312impl PartialEq for RemoteNodeState {
313    fn eq(&self, other: &RemoteNodeState) -> bool {
314        self.id == other.id && self.addr == other.addr
315    }
316}