eshanized_polaris_core/
node.rs

1//! Node model and lifecycle management.
2//!
3//! This module defines nodes in the cluster and their lifecycle.
4
5use crate::config::ResourceLimits;
6use crate::errors::{PolarisError, PolarisResult};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::fmt;
10use std::net::SocketAddr;
11use std::sync::Arc;
12use uuid::Uuid;
13
14/// Unique identifier for a node
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct NodeId(Uuid);
17
18impl NodeId {
19    /// Create a new random node ID
20    pub fn new() -> Self {
21        Self(Uuid::new_v4())
22    }
23
24    /// Get the inner UUID
25    pub fn as_uuid(&self) -> &Uuid {
26        &self.0
27    }
28}
29
30impl Default for NodeId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl fmt::Display for NodeId {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42impl From<Uuid> for NodeId {
43    fn from(uuid: Uuid) -> Self {
44        Self(uuid)
45    }
46}
47
48/// Node status
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[non_exhaustive]
51pub enum NodeStatus {
52    /// Node is starting up
53    Starting,
54    /// Node is healthy and ready
55    Ready,
56    /// Node is busy but accepting work
57    Busy,
58    /// Node is overloaded
59    Overloaded,
60    /// Node is draining (no new tasks)
61    Draining,
62    /// Node is disconnected
63    Disconnected,
64    /// Node has failed
65    Failed,
66}
67
68impl fmt::Display for NodeStatus {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        match self {
71            Self::Starting => write!(f, "starting"),
72            Self::Ready => write!(f, "ready"),
73            Self::Busy => write!(f, "busy"),
74            Self::Overloaded => write!(f, "overloaded"),
75            Self::Draining => write!(f, "draining"),
76            Self::Disconnected => write!(f, "disconnected"),
77            Self::Failed => write!(f, "failed"),
78        }
79    }
80}
81
82/// Node information
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct NodeInfo {
85    /// Unique node identifier
86    pub id: NodeId,
87
88    /// Node name
89    pub name: String,
90
91    /// Node address
92    pub addr: SocketAddr,
93
94    /// Node status
95    pub status: NodeStatus,
96
97    /// Resource limits
98    pub resource_limits: ResourceLimits,
99
100    /// Current resource usage
101    pub resource_usage: ResourceUsage,
102
103    /// Node metadata
104    pub metadata: NodeMetadata,
105}
106
107impl NodeInfo {
108    /// Create new node info
109    pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
110        Self {
111            id: NodeId::new(),
112            name: name.into(),
113            addr,
114            status: NodeStatus::Starting,
115            resource_limits: ResourceLimits::default(),
116            resource_usage: ResourceUsage::default(),
117            metadata: NodeMetadata::default(),
118        }
119    }
120
121    /// Check if node is available for scheduling
122    pub fn is_available(&self) -> bool {
123        matches!(self.status, NodeStatus::Ready | NodeStatus::Busy)
124    }
125
126    /// Check if node is healthy
127    pub fn is_healthy(&self) -> bool {
128        matches!(
129            self.status,
130            NodeStatus::Ready | NodeStatus::Busy | NodeStatus::Overloaded
131        )
132    }
133
134    /// Calculate load factor (0.0 = idle, 1.0 = at limit)
135    pub fn load_factor(&self) -> f64 {
136        if self.resource_limits.max_concurrent_tasks == 0 {
137            return 0.0;
138        }
139        self.resource_usage.active_tasks as f64 / self.resource_limits.max_concurrent_tasks as f64
140    }
141}
142
143/// Current resource usage for a node
144#[derive(Debug, Clone, Default, Serialize, Deserialize)]
145pub struct ResourceUsage {
146    /// Number of active tasks
147    pub active_tasks: usize,
148
149    /// CPU usage percentage (0-100)
150    pub cpu_usage_percent: f64,
151
152    /// Memory usage in bytes
153    pub memory_usage_bytes: u64,
154
155    /// Network usage in bytes/sec
156    pub network_usage_bytes_per_sec: u64,
157}
158
159/// Node metadata
160#[derive(Debug, Clone, Default, Serialize, Deserialize)]
161pub struct NodeMetadata {
162    /// When the node started
163    pub started_at: Option<DateTime<Utc>>,
164
165    /// Last heartbeat timestamp
166    pub last_heartbeat: Option<DateTime<Utc>>,
167
168    /// Node version
169    pub version: Option<String>,
170
171    /// Operating system
172    pub os: Option<String>,
173
174    /// Architecture
175    pub arch: Option<String>,
176
177    /// Custom labels
178    pub labels: std::collections::HashMap<String, String>,
179}
180
181/// A node in the cluster
182#[derive(Clone)]
183pub struct Node {
184    info: Arc<parking_lot::RwLock<NodeInfo>>,
185}
186
187impl Node {
188    /// Create a new node
189    pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
190        Self {
191            info: Arc::new(parking_lot::RwLock::new(NodeInfo::new(name, addr))),
192        }
193    }
194
195    /// Get node ID
196    pub fn id(&self) -> NodeId {
197        self.info.read().id
198    }
199
200    /// Get node info (read-only copy)
201    pub fn info(&self) -> NodeInfo {
202        self.info.read().clone()
203    }
204
205    /// Get node status
206    pub fn status(&self) -> NodeStatus {
207        self.info.read().status
208    }
209
210    /// Update node status
211    pub fn set_status(&self, status: NodeStatus) {
212        self.info.write().status = status;
213    }
214
215    /// Check if node is available
216    pub fn is_available(&self) -> bool {
217        self.info.read().is_available()
218    }
219
220    /// Get load factor
221    pub fn load_factor(&self) -> f64 {
222        self.info.read().load_factor()
223    }
224
225    /// Update resource usage
226    pub fn update_usage(&self, usage: ResourceUsage) {
227        self.info.write().resource_usage = usage;
228    }
229
230    /// Record heartbeat
231    pub fn record_heartbeat(&self) {
232        self.info.write().metadata.last_heartbeat = Some(Utc::now());
233    }
234
235    /// Increment active task count
236    pub fn increment_active_tasks(&self) {
237        self.info.write().resource_usage.active_tasks += 1;
238    }
239
240    /// Decrement active task count
241    pub fn decrement_active_tasks(&self) {
242        let mut info = self.info.write();
243        if info.resource_usage.active_tasks > 0 {
244            info.resource_usage.active_tasks -= 1;
245        }
246    }
247}
248
249impl fmt::Debug for Node {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        let info = self.info.read();
252        f.debug_struct("Node")
253            .field("id", &info.id)
254            .field("name", &info.name)
255            .field("status", &info.status)
256            .finish()
257    }
258}
259
260/// Registry for managing cluster nodes
261#[derive(Debug, Clone)]
262pub struct NodeRegistry {
263    nodes: Arc<dashmap::DashMap<NodeId, Node>>,
264}
265
266impl NodeRegistry {
267    /// Create a new node registry
268    pub fn new() -> Self {
269        Self {
270            nodes: Arc::new(dashmap::DashMap::new()),
271        }
272    }
273
274    /// Register a node
275    pub fn register(&self, node: Node) -> PolarisResult<()> {
276        let id = node.id();
277        if self.nodes.contains_key(&id) {
278            return Err(PolarisError::other(format!("Node {} already registered", id)));
279        }
280        self.nodes.insert(id, node);
281        tracing::info!(node_id = %id, "Node registered");
282        Ok(())
283    }
284
285    /// Unregister a node
286    pub fn unregister(&self, id: NodeId) -> PolarisResult<()> {
287        self.nodes
288            .remove(&id)
289            .ok_or_else(|| PolarisError::NodeNotFound(id.to_string()))?;
290        tracing::info!(node_id = %id, "Node unregistered");
291        Ok(())
292    }
293
294    /// Get a node by ID
295    pub fn get(&self, id: NodeId) -> Option<Node> {
296        self.nodes.get(&id).map(|entry| entry.value().clone())
297    }
298
299    /// Get all nodes
300    pub fn all(&self) -> Vec<Node> {
301        self.nodes.iter().map(|entry| entry.value().clone()).collect()
302    }
303
304    /// Get available nodes
305    pub fn available(&self) -> Vec<Node> {
306        self.nodes
307            .iter()
308            .map(|entry| entry.value().clone())
309            .filter(|node| node.is_available())
310            .collect()
311    }
312
313    /// Get node count
314    pub fn count(&self) -> usize {
315        self.nodes.len()
316    }
317
318    /// Check if a node exists
319    pub fn contains(&self, id: NodeId) -> bool {
320        self.nodes.contains_key(&id)
321    }
322}
323
324impl Default for NodeRegistry {
325    fn default() -> Self {
326        Self::new()
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_node_id_creation() {
336        let id1 = NodeId::new();
337        let id2 = NodeId::new();
338        assert_ne!(id1, id2);
339    }
340
341    #[test]
342    fn test_node_creation() {
343        let addr = "127.0.0.1:7001".parse().unwrap();
344        let node = Node::new("test-node", addr);
345        assert_eq!(node.status(), NodeStatus::Starting);
346    }
347
348    #[test]
349    fn test_node_status_transition() {
350        let addr = "127.0.0.1:7001".parse().unwrap();
351        let node = Node::new("test-node", addr);
352
353        node.set_status(NodeStatus::Ready);
354        assert_eq!(node.status(), NodeStatus::Ready);
355        assert!(node.is_available());
356    }
357
358    #[test]
359    fn test_node_load_factor() {
360        let addr = "127.0.0.1:7001".parse().unwrap();
361        let node = Node::new("test-node", addr);
362
363        assert_eq!(node.load_factor(), 0.0);
364
365        node.increment_active_tasks();
366        assert!(node.load_factor() > 0.0);
367    }
368
369    #[test]
370    fn test_node_registry() {
371        let registry = NodeRegistry::new();
372        let addr = "127.0.0.1:7001".parse().unwrap();
373        let node = Node::new("test-node", addr);
374        let id = node.id();
375
376        assert!(registry.register(node.clone()).is_ok());
377        assert_eq!(registry.count(), 1);
378        assert!(registry.contains(id));
379
380        let retrieved = registry.get(id).unwrap();
381        assert_eq!(retrieved.id(), id);
382
383        assert!(registry.unregister(id).is_ok());
384        assert_eq!(registry.count(), 0);
385    }
386
387    #[test]
388    fn test_node_availability_filter() {
389        let registry = NodeRegistry::new();
390
391        let node1 = Node::new("node1", "127.0.0.1:7001".parse().unwrap());
392        node1.set_status(NodeStatus::Ready);
393
394        let node2 = Node::new("node2", "127.0.0.1:7002".parse().unwrap());
395        node2.set_status(NodeStatus::Disconnected);
396
397        registry.register(node1).unwrap();
398        registry.register(node2).unwrap();
399
400        let available = registry.available();
401        assert_eq!(available.len(), 1);
402    }
403}