forge_core/cluster/
node.rs

1use std::net::IpAddr;
2use std::str::FromStr;
3
4use chrono::{DateTime, Utc};
5use uuid::Uuid;
6
7use super::roles::NodeRole;
8
9/// Unique node identifier.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct NodeId(pub Uuid);
12
13impl NodeId {
14    /// Generate a new random node ID.
15    pub fn new() -> Self {
16        Self(Uuid::new_v4())
17    }
18
19    /// Create from an existing UUID.
20    pub fn from_uuid(id: Uuid) -> Self {
21        Self(id)
22    }
23
24    /// Get the inner UUID.
25    pub fn as_uuid(&self) -> Uuid {
26        self.0
27    }
28}
29
30impl Default for NodeId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for NodeId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42/// Node status in the cluster.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum NodeStatus {
45    /// Node is starting up.
46    Joining,
47    /// Node is healthy and active.
48    Active,
49    /// Node is shutting down gracefully.
50    Draining,
51    /// Node has stopped sending heartbeats.
52    Dead,
53}
54
55impl NodeStatus {
56    /// Convert to string for database storage.
57    pub fn as_str(&self) -> &'static str {
58        match self {
59            Self::Joining => "joining",
60            Self::Active => "active",
61            Self::Draining => "draining",
62            Self::Dead => "dead",
63        }
64    }
65
66    /// Check if node can accept new work.
67    pub fn can_accept_work(&self) -> bool {
68        matches!(self, Self::Active)
69    }
70}
71
72impl FromStr for NodeStatus {
73    type Err = std::convert::Infallible;
74
75    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
76        Ok(match s {
77            "joining" => Self::Joining,
78            "active" => Self::Active,
79            "draining" => Self::Draining,
80            "dead" => Self::Dead,
81            _ => Self::Dead,
82        })
83    }
84}
85
86/// Information about a node in the cluster.
87#[derive(Debug, Clone)]
88pub struct NodeInfo {
89    /// Unique node ID.
90    pub id: NodeId,
91    /// Hostname.
92    pub hostname: String,
93    /// IP address.
94    pub ip_address: IpAddr,
95    /// HTTP port.
96    pub http_port: u16,
97    /// gRPC port for inter-node communication.
98    pub grpc_port: u16,
99    /// Enabled roles.
100    pub roles: Vec<NodeRole>,
101    /// Worker capabilities.
102    pub worker_capabilities: Vec<String>,
103    /// Current status.
104    pub status: NodeStatus,
105    /// Last heartbeat time.
106    pub last_heartbeat: DateTime<Utc>,
107    /// Version string.
108    pub version: String,
109    /// When the node started.
110    pub started_at: DateTime<Utc>,
111    /// Current connection count.
112    pub current_connections: u32,
113    /// Current job count.
114    pub current_jobs: u32,
115    /// CPU usage percentage.
116    pub cpu_usage: f32,
117    /// Memory usage percentage.
118    pub memory_usage: f32,
119}
120
121impl NodeInfo {
122    /// Create a new node info for the local node.
123    pub fn new_local(
124        hostname: String,
125        ip_address: IpAddr,
126        http_port: u16,
127        grpc_port: u16,
128        roles: Vec<NodeRole>,
129        worker_capabilities: Vec<String>,
130        version: String,
131    ) -> Self {
132        Self {
133            id: NodeId::new(),
134            hostname,
135            ip_address,
136            http_port,
137            grpc_port,
138            roles,
139            worker_capabilities,
140            status: NodeStatus::Joining,
141            last_heartbeat: Utc::now(),
142            version,
143            started_at: Utc::now(),
144            current_connections: 0,
145            current_jobs: 0,
146            cpu_usage: 0.0,
147            memory_usage: 0.0,
148        }
149    }
150
151    /// Check if this node has a specific role.
152    pub fn has_role(&self, role: NodeRole) -> bool {
153        self.roles.contains(&role)
154    }
155
156    /// Check if this node has a specific worker capability.
157    pub fn has_capability(&self, capability: &str) -> bool {
158        self.worker_capabilities.iter().any(|c| c == capability)
159    }
160
161    /// Calculate node load (0.0 to 1.0).
162    pub fn load(&self) -> f32 {
163        // Simple average of CPU and memory
164        (self.cpu_usage + self.memory_usage) / 2.0 / 100.0
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use std::net::Ipv4Addr;
172
173    #[test]
174    fn test_node_id_generation() {
175        let id1 = NodeId::new();
176        let id2 = NodeId::new();
177        assert_ne!(id1, id2);
178    }
179
180    #[test]
181    fn test_node_status_conversion() {
182        assert_eq!("active".parse::<NodeStatus>(), Ok(NodeStatus::Active));
183        assert_eq!("draining".parse::<NodeStatus>(), Ok(NodeStatus::Draining));
184        assert_eq!(NodeStatus::Active.as_str(), "active");
185    }
186
187    #[test]
188    fn test_node_can_accept_work() {
189        assert!(NodeStatus::Active.can_accept_work());
190        assert!(!NodeStatus::Draining.can_accept_work());
191        assert!(!NodeStatus::Dead.can_accept_work());
192    }
193
194    #[test]
195    fn test_node_info_creation() {
196        let info = NodeInfo::new_local(
197            "test-node".to_string(),
198            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
199            8080,
200            9000,
201            vec![NodeRole::Gateway, NodeRole::Worker],
202            vec!["general".to_string()],
203            "0.1.0".to_string(),
204        );
205
206        assert!(info.has_role(NodeRole::Gateway));
207        assert!(info.has_role(NodeRole::Worker));
208        assert!(!info.has_role(NodeRole::Scheduler));
209        assert!(info.has_capability("general"));
210        assert!(!info.has_capability("media"));
211    }
212
213    #[test]
214    fn test_node_load_calculation() {
215        let mut info = NodeInfo::new_local(
216            "test".to_string(),
217            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
218            8080,
219            9000,
220            vec![],
221            vec![],
222            "0.1.0".to_string(),
223        );
224        info.cpu_usage = 50.0;
225        info.memory_usage = 30.0;
226        assert!((info.load() - 0.4).abs() < 0.001);
227    }
228}