1use crate::config::ResourceLimits;
6use crate::errors::{PolarisError, PolarisResult};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::fmt;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct NodeId(Uuid);
17
18impl NodeId {
19 pub fn new() -> Self {
21 Self(Uuid::new_v4())
22 }
23
24 pub fn as_uuid(&self) -> &Uuid {
26 &self.0
27 }
28}
29
30impl Default for NodeId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl fmt::Display for NodeId {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42impl From<Uuid> for NodeId {
43 fn from(uuid: Uuid) -> Self {
44 Self(uuid)
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[non_exhaustive]
51pub enum NodeStatus {
52 Starting,
54 Ready,
56 Busy,
58 Overloaded,
60 Draining,
62 Disconnected,
64 Failed,
66}
67
68impl fmt::Display for NodeStatus {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match self {
71 Self::Starting => write!(f, "starting"),
72 Self::Ready => write!(f, "ready"),
73 Self::Busy => write!(f, "busy"),
74 Self::Overloaded => write!(f, "overloaded"),
75 Self::Draining => write!(f, "draining"),
76 Self::Disconnected => write!(f, "disconnected"),
77 Self::Failed => write!(f, "failed"),
78 }
79 }
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct NodeInfo {
85 pub id: NodeId,
87
88 pub name: String,
90
91 pub addr: SocketAddr,
93
94 pub status: NodeStatus,
96
97 pub resource_limits: ResourceLimits,
99
100 pub resource_usage: ResourceUsage,
102
103 pub metadata: NodeMetadata,
105}
106
107impl NodeInfo {
108 pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
110 Self {
111 id: NodeId::new(),
112 name: name.into(),
113 addr,
114 status: NodeStatus::Starting,
115 resource_limits: ResourceLimits::default(),
116 resource_usage: ResourceUsage::default(),
117 metadata: NodeMetadata::default(),
118 }
119 }
120
121 pub fn is_available(&self) -> bool {
123 matches!(self.status, NodeStatus::Ready | NodeStatus::Busy)
124 }
125
126 pub fn is_healthy(&self) -> bool {
128 matches!(
129 self.status,
130 NodeStatus::Ready | NodeStatus::Busy | NodeStatus::Overloaded
131 )
132 }
133
134 pub fn load_factor(&self) -> f64 {
136 if self.resource_limits.max_concurrent_tasks == 0 {
137 return 0.0;
138 }
139 self.resource_usage.active_tasks as f64 / self.resource_limits.max_concurrent_tasks as f64
140 }
141}
142
143#[derive(Debug, Clone, Default, Serialize, Deserialize)]
145pub struct ResourceUsage {
146 pub active_tasks: usize,
148
149 pub cpu_usage_percent: f64,
151
152 pub memory_usage_bytes: u64,
154
155 pub network_usage_bytes_per_sec: u64,
157}
158
159#[derive(Debug, Clone, Default, Serialize, Deserialize)]
161pub struct NodeMetadata {
162 pub started_at: Option<DateTime<Utc>>,
164
165 pub last_heartbeat: Option<DateTime<Utc>>,
167
168 pub version: Option<String>,
170
171 pub os: Option<String>,
173
174 pub arch: Option<String>,
176
177 pub labels: std::collections::HashMap<String, String>,
179}
180
181#[derive(Clone)]
183pub struct Node {
184 info: Arc<parking_lot::RwLock<NodeInfo>>,
185}
186
187impl Node {
188 pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
190 Self {
191 info: Arc::new(parking_lot::RwLock::new(NodeInfo::new(name, addr))),
192 }
193 }
194
195 pub fn id(&self) -> NodeId {
197 self.info.read().id
198 }
199
200 pub fn info(&self) -> NodeInfo {
202 self.info.read().clone()
203 }
204
205 pub fn status(&self) -> NodeStatus {
207 self.info.read().status
208 }
209
210 pub fn set_status(&self, status: NodeStatus) {
212 self.info.write().status = status;
213 }
214
215 pub fn is_available(&self) -> bool {
217 self.info.read().is_available()
218 }
219
220 pub fn load_factor(&self) -> f64 {
222 self.info.read().load_factor()
223 }
224
225 pub fn update_usage(&self, usage: ResourceUsage) {
227 self.info.write().resource_usage = usage;
228 }
229
230 pub fn record_heartbeat(&self) {
232 self.info.write().metadata.last_heartbeat = Some(Utc::now());
233 }
234
235 pub fn increment_active_tasks(&self) {
237 self.info.write().resource_usage.active_tasks += 1;
238 }
239
240 pub fn decrement_active_tasks(&self) {
242 let mut info = self.info.write();
243 if info.resource_usage.active_tasks > 0 {
244 info.resource_usage.active_tasks -= 1;
245 }
246 }
247}
248
249impl fmt::Debug for Node {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 let info = self.info.read();
252 f.debug_struct("Node")
253 .field("id", &info.id)
254 .field("name", &info.name)
255 .field("status", &info.status)
256 .finish()
257 }
258}
259
260#[derive(Debug, Clone)]
262pub struct NodeRegistry {
263 nodes: Arc<dashmap::DashMap<NodeId, Node>>,
264}
265
266impl NodeRegistry {
267 pub fn new() -> Self {
269 Self {
270 nodes: Arc::new(dashmap::DashMap::new()),
271 }
272 }
273
274 pub fn register(&self, node: Node) -> PolarisResult<()> {
276 let id = node.id();
277 if self.nodes.contains_key(&id) {
278 return Err(PolarisError::other(format!("Node {} already registered", id)));
279 }
280 self.nodes.insert(id, node);
281 tracing::info!(node_id = %id, "Node registered");
282 Ok(())
283 }
284
285 pub fn unregister(&self, id: NodeId) -> PolarisResult<()> {
287 self.nodes
288 .remove(&id)
289 .ok_or_else(|| PolarisError::NodeNotFound(id.to_string()))?;
290 tracing::info!(node_id = %id, "Node unregistered");
291 Ok(())
292 }
293
294 pub fn get(&self, id: NodeId) -> Option<Node> {
296 self.nodes.get(&id).map(|entry| entry.value().clone())
297 }
298
299 pub fn all(&self) -> Vec<Node> {
301 self.nodes.iter().map(|entry| entry.value().clone()).collect()
302 }
303
304 pub fn available(&self) -> Vec<Node> {
306 self.nodes
307 .iter()
308 .map(|entry| entry.value().clone())
309 .filter(|node| node.is_available())
310 .collect()
311 }
312
313 pub fn count(&self) -> usize {
315 self.nodes.len()
316 }
317
318 pub fn contains(&self, id: NodeId) -> bool {
320 self.nodes.contains_key(&id)
321 }
322}
323
324impl Default for NodeRegistry {
325 fn default() -> Self {
326 Self::new()
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_node_id_creation() {
336 let id1 = NodeId::new();
337 let id2 = NodeId::new();
338 assert_ne!(id1, id2);
339 }
340
341 #[test]
342 fn test_node_creation() {
343 let addr = "127.0.0.1:7001".parse().unwrap();
344 let node = Node::new("test-node", addr);
345 assert_eq!(node.status(), NodeStatus::Starting);
346 }
347
348 #[test]
349 fn test_node_status_transition() {
350 let addr = "127.0.0.1:7001".parse().unwrap();
351 let node = Node::new("test-node", addr);
352
353 node.set_status(NodeStatus::Ready);
354 assert_eq!(node.status(), NodeStatus::Ready);
355 assert!(node.is_available());
356 }
357
358 #[test]
359 fn test_node_load_factor() {
360 let addr = "127.0.0.1:7001".parse().unwrap();
361 let node = Node::new("test-node", addr);
362
363 assert_eq!(node.load_factor(), 0.0);
364
365 node.increment_active_tasks();
366 assert!(node.load_factor() > 0.0);
367 }
368
369 #[test]
370 fn test_node_registry() {
371 let registry = NodeRegistry::new();
372 let addr = "127.0.0.1:7001".parse().unwrap();
373 let node = Node::new("test-node", addr);
374 let id = node.id();
375
376 assert!(registry.register(node.clone()).is_ok());
377 assert_eq!(registry.count(), 1);
378 assert!(registry.contains(id));
379
380 let retrieved = registry.get(id).unwrap();
381 assert_eq!(retrieved.id(), id);
382
383 assert!(registry.unregister(id).is_ok());
384 assert_eq!(registry.count(), 0);
385 }
386
387 #[test]
388 fn test_node_availability_filter() {
389 let registry = NodeRegistry::new();
390
391 let node1 = Node::new("node1", "127.0.0.1:7001".parse().unwrap());
392 node1.set_status(NodeStatus::Ready);
393
394 let node2 = Node::new("node2", "127.0.0.1:7002".parse().unwrap());
395 node2.set_status(NodeStatus::Disconnected);
396
397 registry.register(node1).unwrap();
398 registry.register(node2).unwrap();
399
400 let available = registry.available();
401 assert_eq!(available.len(), 1);
402 }
403}