1use crate::remote::system::NodeId;
2
3use hashring::HashRing;
4
5use crate::remote::config::SystemCapabilities;
6use crate::remote::net::message::{datetime_to_timestamp, timestamp_to_datetime};
7use crate::remote::net::proto::network;
8use chrono::{DateTime, Utc};
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11use std::sync::Arc;
12use std::time::Duration;
13
14pub struct RemoteNodeStore {
15 nodes: HashMap<NodeId, RemoteNodeState>,
16 table: HashRing<RemoteNode>,
17}
18
19#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
20pub enum NodeStatus {
21 Joining,
22 Healthy,
23 Unhealthy,
24 Terminated,
25}
26
27impl NodeStatus {
28 pub fn is_healthy(&self) -> bool {
29 return matches!(&self, Self::Healthy);
30 }
31}
32
33pub type NodeAttributes = HashMap<Arc<str>, Arc<str>>;
34
35pub type NodeAttributesRef = Arc<NodeAttributes>;
36
37#[derive(Debug, Clone)]
38pub struct RemoteNodeState {
39 pub id: NodeId,
40 pub addr: String,
41 pub tag: String,
42 pub ping_latency: Option<Duration>,
43 pub last_heartbeat: Option<DateTime<Utc>>,
44 pub node_started_at: Option<DateTime<Utc>>,
45 pub status: NodeStatus,
46 pub attributes: NodeAttributesRef,
47}
48
49#[derive(Debug, Clone)]
50pub struct RemoteNode {
51 pub id: NodeId,
52 pub addr: String,
53 pub tag: String,
54 pub node_started_at: Option<DateTime<Utc>>,
55 pub attributes: NodeAttributesRef,
56}
57
58impl Hash for RemoteNode {
59 fn hash<H: Hasher>(&self, state: &mut H) {
60 self.id.hash(state);
61 self.addr.hash(state);
62 self.tag.hash(state);
63 self.node_started_at.hash(state);
64 }
65}
66
67#[derive(Clone)]
68pub struct NodeIdentity {
69 pub node: RemoteNode,
70 pub peers: Vec<RemoteNode>,
71 pub capabilities: SystemCapabilities,
72}
73
74impl RemoteNodeStore {
75 pub fn new(nodes: Vec<RemoteNode>) -> RemoteNodeStore {
76 let mut table = HashRing::new();
77
78 let nodes = nodes
79 .into_iter()
80 .map(|n| {
81 table.add(n.clone());
82 (n.id, RemoteNodeState::new(n))
83 })
84 .collect();
85
86 RemoteNodeStore { table, nodes }
87 }
88
89 pub fn update_nodes(&mut self, nodes: Vec<RemoteNodeState>) {
90 for node in nodes {
91 self.nodes.insert(node.id, node);
92 }
93 }
94
95 pub fn node_terminated(&mut self, node_id: NodeId) {
96 let node = self.get_mut(&node_id);
97 if let Some(node) = node {
98 node.status = NodeStatus::Terminated;
99 }
100 }
101
102 pub fn get(&self, node_id: &NodeId) -> Option<&RemoteNodeState> {
103 self.nodes.get(node_id)
104 }
105
106 pub fn get_mut(&mut self, node_id: &NodeId) -> Option<&mut RemoteNodeState> {
107 self.nodes.get_mut(node_id)
108 }
109
110 pub fn is_registered(&self, node_id: NodeId) -> bool {
111 self.nodes.contains_key(&node_id)
112 }
113
114 pub fn remove(&mut self, node_id: &NodeId) -> Option<RemoteNode> {
115 self.nodes.remove(&node_id).and_then(|node| {
116 let node = node.into();
117 self.table.remove(&node)
118 })
119 }
120
121 pub fn get_by_key(&mut self, key: impl Hash) -> Option<&RemoteNode> {
122 self.table.get(&key)
123 }
124
125 pub fn add(&mut self, node: RemoteNode) {
126 let mut nodes = self.get_all();
127 nodes.push(RemoteNodeState::new(node));
128 nodes.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
129
130 self.table = HashRing::new();
131 self.nodes = nodes
132 .into_iter()
133 .map(|n| {
134 let node = n.clone();
135 self.table.add(n.into());
136 (node.id, node)
137 })
138 .collect();
139 }
140
141 pub fn get_all(&self) -> Vec<RemoteNodeState> {
142 self.nodes.values().cloned().collect()
143 }
144}
145
146impl RemoteNodeState {
147 pub fn new(node: RemoteNode) -> Self {
148 let id = node.id;
149 let addr = node.addr;
150 let node_started_at = node.node_started_at;
151 let tag = node.tag;
152
153 Self {
154 id,
155 addr,
156 node_started_at,
157 tag,
158 ping_latency: None,
159 last_heartbeat: None,
160 status: NodeStatus::Joining,
161 attributes: node.attributes.clone(),
162 }
163 }
164}
165
166impl From<RemoteNodeState> for RemoteNode {
167 fn from(s: RemoteNodeState) -> Self {
168 Self {
169 id: s.id,
170 addr: s.addr,
171 tag: s.tag,
172 node_started_at: s.node_started_at,
173 attributes: s.attributes.clone(),
174 }
175 }
176}
177
178impl From<network::RemoteNode> for RemoteNode {
179 fn from(n: network::RemoteNode) -> Self {
180 Self {
181 id: n.node_id,
182 addr: n.addr,
183 tag: n.tag,
184 node_started_at: n.node_started_at.into_option().map(timestamp_to_datetime),
185 attributes: n
186 .attributes
187 .into_iter()
188 .map(|(k, v)| (k.into(), v.into()))
189 .collect::<NodeAttributes>()
190 .into(),
191 }
192 }
193}
194
195impl From<RemoteNode> for network::RemoteNode {
196 fn from(n: RemoteNode) -> Self {
197 Self {
198 node_id: n.id,
199 addr: n.addr,
200 tag: n.tag,
201 node_started_at: n.node_started_at.as_ref().map(datetime_to_timestamp).into(),
202 attributes: n
203 .attributes
204 .iter()
205 .map(|(k, v)| (k.to_string(), v.to_string()))
206 .collect(),
207 ..Self::default()
208 }
209 }
210}
211
212impl From<&RemoteNode> for network::RemoteNode {
213 fn from(n: &RemoteNode) -> Self {
214 Self {
215 node_id: n.id,
216 addr: n.addr.clone(),
217 tag: n.tag.clone(),
218 node_started_at: n.node_started_at.as_ref().map(datetime_to_timestamp).into(),
219 attributes: n
220 .attributes
221 .iter()
222 .map(|(k, v)| (k.to_string(), v.to_string()))
223 .collect(),
224 ..Self::default()
225 }
226 }
227}
228
229impl From<RemoteNodeState> for network::RemoteNode {
230 fn from(s: RemoteNodeState) -> Self {
231 Self {
232 node_id: s.id,
233 addr: s.addr.clone(),
234 tag: s.tag.clone(),
235 node_started_at: s.node_started_at.as_ref().map(datetime_to_timestamp).into(),
236 attributes: s
237 .attributes
238 .iter()
239 .map(|(k, v)| (k.to_string(), v.to_string()))
240 .collect(),
241 ..Self::default()
242 }
243 }
244}
245
246impl From<&network::NodeIdentity> for RemoteNode {
247 fn from(n: &network::NodeIdentity) -> Self {
248 RemoteNode {
249 id: n.node_id,
250 addr: n.addr.clone(),
251 tag: n.node_tag.clone(),
252 node_started_at: n
253 .node_started_at
254 .clone()
255 .into_option()
256 .map(timestamp_to_datetime),
257 attributes: n
258 .attributes
259 .iter()
260 .map(|(k, v)| (k.clone().into(), v.clone().into()))
261 .collect::<NodeAttributes>()
262 .into(),
263 }
264 }
265}
266
267impl Default for RemoteNodeState {
268 fn default() -> Self {
269 RemoteNodeState {
270 id: NodeId::default(),
271 addr: String::default(),
272 tag: String::default(),
273 status: NodeStatus::Joining,
274 ping_latency: None,
275 last_heartbeat: None,
276 node_started_at: None,
277 attributes: Arc::new(NodeAttributes::new()),
278 }
279 }
280}
281
282impl RemoteNode {
283 pub fn new(
284 id: u64,
285 addr: String,
286 tag: String,
287 node_started_at: Option<DateTime<Utc>>,
288 attributes: NodeAttributesRef,
289 ) -> RemoteNode {
290 RemoteNode {
291 id,
292 addr,
293 tag,
294 node_started_at,
295 attributes,
296 }
297 }
298}
299
300impl ToString for RemoteNode {
301 fn to_string(&self) -> String {
302 format!("{}|{}", self.addr, self.id)
303 }
304}
305
306impl PartialEq for RemoteNode {
307 fn eq(&self, other: &RemoteNode) -> bool {
308 self.id == other.id && self.addr == other.addr
309 }
310}
311
312impl PartialEq for RemoteNodeState {
313 fn eq(&self, other: &RemoteNodeState) -> bool {
314 self.id == other.id && self.addr == other.addr
315 }
316}