Skip to main content

forge_core/cluster/
node.rs

1use std::net::IpAddr;
2use std::str::FromStr;
3
4use chrono::{DateTime, Utc};
5use uuid::Uuid;
6
7use super::roles::NodeRole;
8
9/// Unique node identifier.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct NodeId(pub Uuid);
12
13impl NodeId {
14    /// Generate a new random node ID.
15    pub fn new() -> Self {
16        Self(Uuid::new_v4())
17    }
18
19    /// Create from an existing UUID.
20    pub fn from_uuid(id: Uuid) -> Self {
21        Self(id)
22    }
23
24    /// Get the inner UUID.
25    pub fn as_uuid(&self) -> Uuid {
26        self.0
27    }
28}
29
30impl Default for NodeId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for NodeId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42/// Node status in the cluster.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum NodeStatus {
45    /// Node is starting up.
46    Joining,
47    /// Node is healthy and active.
48    Active,
49    /// Node is shutting down gracefully.
50    Draining,
51    /// Node has stopped sending heartbeats.
52    Dead,
53}
54
55impl NodeStatus {
56    /// Convert to string for database storage.
57    pub fn as_str(&self) -> &'static str {
58        match self {
59            Self::Joining => "joining",
60            Self::Active => "active",
61            Self::Draining => "draining",
62            Self::Dead => "dead",
63        }
64    }
65
66    /// Check if node can accept new work.
67    pub fn can_accept_work(&self) -> bool {
68        matches!(self, Self::Active)
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct ParseNodeStatusError(pub String);
74
75impl std::fmt::Display for ParseNodeStatusError {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "invalid node status: '{}'", self.0)
78    }
79}
80
81impl std::error::Error for ParseNodeStatusError {}
82
83impl FromStr for NodeStatus {
84    type Err = ParseNodeStatusError;
85
86    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
87        match s {
88            "joining" => Ok(Self::Joining),
89            "active" => Ok(Self::Active),
90            "draining" => Ok(Self::Draining),
91            "dead" => Ok(Self::Dead),
92            _ => Err(ParseNodeStatusError(s.to_string())),
93        }
94    }
95}
96
97/// Information about a node in the cluster.
98#[derive(Debug, Clone)]
99pub struct NodeInfo {
100    /// Unique node ID.
101    pub id: NodeId,
102    /// Hostname.
103    pub hostname: String,
104    /// IP address.
105    pub ip_address: IpAddr,
106    /// HTTP port.
107    pub http_port: u16,
108    /// gRPC port for inter-node communication.
109    pub grpc_port: u16,
110    /// Enabled roles.
111    pub roles: Vec<NodeRole>,
112    /// Worker capabilities.
113    pub worker_capabilities: Vec<String>,
114    /// Current status.
115    pub status: NodeStatus,
116    /// Last heartbeat time.
117    pub last_heartbeat: DateTime<Utc>,
118    /// Version string.
119    pub version: String,
120    /// When the node started.
121    pub started_at: DateTime<Utc>,
122    /// Current connection count.
123    pub current_connections: u32,
124    /// Current job count.
125    pub current_jobs: u32,
126    /// CPU usage percentage.
127    pub cpu_usage: f32,
128    /// Memory usage percentage.
129    pub memory_usage: f32,
130}
131
132impl NodeInfo {
133    /// Create a new node info for the local node.
134    pub fn new_local(
135        hostname: String,
136        ip_address: IpAddr,
137        http_port: u16,
138        grpc_port: u16,
139        roles: Vec<NodeRole>,
140        worker_capabilities: Vec<String>,
141        version: String,
142    ) -> Self {
143        Self {
144            id: NodeId::new(),
145            hostname,
146            ip_address,
147            http_port,
148            grpc_port,
149            roles,
150            worker_capabilities,
151            status: NodeStatus::Joining,
152            last_heartbeat: Utc::now(),
153            version,
154            started_at: Utc::now(),
155            current_connections: 0,
156            current_jobs: 0,
157            cpu_usage: 0.0,
158            memory_usage: 0.0,
159        }
160    }
161
162    /// Check if this node has a specific role.
163    pub fn has_role(&self, role: NodeRole) -> bool {
164        self.roles.contains(&role)
165    }
166
167    /// Check if this node has a specific worker capability.
168    pub fn has_capability(&self, capability: &str) -> bool {
169        self.worker_capabilities.iter().any(|c| c == capability)
170    }
171
172    /// Calculate node load (0.0 to 1.0).
173    pub fn load(&self) -> f32 {
174        // Simple average of CPU and memory
175        (self.cpu_usage + self.memory_usage) / 2.0 / 100.0
176    }
177}
178
179#[cfg(test)]
180#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
181mod tests {
182    use super::*;
183    use std::net::Ipv4Addr;
184
185    #[test]
186    fn test_node_id_generation() {
187        let id1 = NodeId::new();
188        let id2 = NodeId::new();
189        assert_ne!(id1, id2);
190    }
191
192    #[test]
193    fn test_node_status_conversion() {
194        assert_eq!("active".parse::<NodeStatus>(), Ok(NodeStatus::Active));
195        assert_eq!("draining".parse::<NodeStatus>(), Ok(NodeStatus::Draining));
196        assert!("invalid".parse::<NodeStatus>().is_err());
197        assert_eq!(NodeStatus::Active.as_str(), "active");
198    }
199
200    #[test]
201    fn test_node_can_accept_work() {
202        assert!(NodeStatus::Active.can_accept_work());
203        assert!(!NodeStatus::Draining.can_accept_work());
204        assert!(!NodeStatus::Dead.can_accept_work());
205    }
206
207    #[test]
208    fn test_node_info_creation() {
209        let info = NodeInfo::new_local(
210            "test-node".to_string(),
211            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
212            8080,
213            9000,
214            vec![NodeRole::Gateway, NodeRole::Worker],
215            vec!["general".to_string()],
216            "0.1.0".to_string(),
217        );
218
219        assert!(info.has_role(NodeRole::Gateway));
220        assert!(info.has_role(NodeRole::Worker));
221        assert!(!info.has_role(NodeRole::Scheduler));
222        assert!(info.has_capability("general"));
223        assert!(!info.has_capability("media"));
224    }
225
226    #[test]
227    fn test_node_load_calculation() {
228        let mut info = NodeInfo::new_local(
229            "test".to_string(),
230            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
231            8080,
232            9000,
233            vec![],
234            vec![],
235            "0.1.0".to_string(),
236        );
237        info.cpu_usage = 50.0;
238        info.memory_usage = 30.0;
239        assert!((info.load() - 0.4).abs() < 0.001);
240    }
241}