Skip to main content

forge_core/cluster/
node.rs

1use std::net::IpAddr;
2use std::str::FromStr;
3
4use chrono::{DateTime, Utc};
5use uuid::Uuid;
6
7use super::roles::NodeRole;
8
9/// Unique node identifier.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct NodeId(pub Uuid);
12
13impl NodeId {
14    pub fn new() -> Self {
15        Self(Uuid::new_v4())
16    }
17
18    pub fn from_uuid(id: Uuid) -> Self {
19        Self(id)
20    }
21
22    pub fn as_uuid(&self) -> Uuid {
23        self.0
24    }
25}
26
27impl Default for NodeId {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl std::fmt::Display for NodeId {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "{}", self.0)
36    }
37}
38
39/// Node status in the cluster.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41#[non_exhaustive]
42pub enum NodeStatus {
43    /// Node is starting up.
44    Joining,
45    /// Node is healthy and active.
46    Active,
47    /// Node is shutting down gracefully.
48    Draining,
49    /// Node has stopped sending heartbeats.
50    Dead,
51}
52
53impl NodeStatus {
54    pub fn as_str(&self) -> &'static str {
55        match self {
56            Self::Joining => "joining",
57            Self::Active => "active",
58            Self::Draining => "draining",
59            Self::Dead => "dead",
60        }
61    }
62
63    pub fn can_accept_work(&self) -> bool {
64        matches!(self, Self::Active)
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct ParseNodeStatusError(pub String);
70
71impl std::fmt::Display for ParseNodeStatusError {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "invalid node status: '{}'", self.0)
74    }
75}
76
77impl std::error::Error for ParseNodeStatusError {}
78
79impl FromStr for NodeStatus {
80    type Err = ParseNodeStatusError;
81
82    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
83        match s {
84            "joining" => Ok(Self::Joining),
85            "active" => Ok(Self::Active),
86            "draining" => Ok(Self::Draining),
87            "dead" => Ok(Self::Dead),
88            _ => Err(ParseNodeStatusError(s.to_string())),
89        }
90    }
91}
92
93/// Information about a node in the cluster.
94#[derive(Debug, Clone)]
95pub struct NodeInfo {
96    pub id: NodeId,
97    pub hostname: String,
98    pub ip_address: IpAddr,
99    pub http_port: u16,
100    pub grpc_port: u16,
101    pub roles: Vec<NodeRole>,
102    pub worker_capabilities: Vec<String>,
103    pub status: NodeStatus,
104    pub last_heartbeat: DateTime<Utc>,
105    pub version: String,
106    pub started_at: DateTime<Utc>,
107    pub current_connections: u32,
108    pub current_jobs: u32,
109    pub cpu_usage: f32,
110    pub memory_usage: f32,
111}
112
113impl NodeInfo {
114    pub fn new_local(
115        hostname: String,
116        ip_address: IpAddr,
117        http_port: u16,
118        grpc_port: u16,
119        roles: Vec<NodeRole>,
120        worker_capabilities: Vec<String>,
121        version: String,
122    ) -> Self {
123        Self {
124            id: NodeId::new(),
125            hostname,
126            ip_address,
127            http_port,
128            grpc_port,
129            roles,
130            worker_capabilities,
131            status: NodeStatus::Joining,
132            last_heartbeat: Utc::now(),
133            version,
134            started_at: Utc::now(),
135            current_connections: 0,
136            current_jobs: 0,
137            cpu_usage: 0.0,
138            memory_usage: 0.0,
139        }
140    }
141
142    pub fn has_role(&self, role: NodeRole) -> bool {
143        self.roles.contains(&role)
144    }
145
146    pub fn has_capability(&self, capability: &str) -> bool {
147        self.worker_capabilities.iter().any(|c| c == capability)
148    }
149
150    /// Average of CPU and memory, normalized to 0.0–1.0.
151    pub fn load(&self) -> f32 {
152        (self.cpu_usage + self.memory_usage) / 2.0 / 100.0
153    }
154}
155
156#[cfg(test)]
157#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
158mod tests {
159    use super::*;
160    use std::net::Ipv4Addr;
161
162    #[test]
163    fn test_node_id_generation() {
164        let id1 = NodeId::new();
165        let id2 = NodeId::new();
166        assert_ne!(id1, id2);
167    }
168
169    #[test]
170    fn test_node_status_conversion() {
171        assert_eq!("active".parse::<NodeStatus>(), Ok(NodeStatus::Active));
172        assert_eq!("draining".parse::<NodeStatus>(), Ok(NodeStatus::Draining));
173        assert!("invalid".parse::<NodeStatus>().is_err());
174        assert_eq!(NodeStatus::Active.as_str(), "active");
175    }
176
177    #[test]
178    fn test_node_can_accept_work() {
179        assert!(NodeStatus::Active.can_accept_work());
180        assert!(!NodeStatus::Draining.can_accept_work());
181        assert!(!NodeStatus::Dead.can_accept_work());
182    }
183
184    #[test]
185    fn test_node_info_creation() {
186        let info = NodeInfo::new_local(
187            "test-node".to_string(),
188            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
189            8080,
190            9000,
191            vec![NodeRole::Gateway, NodeRole::Worker],
192            vec!["general".to_string()],
193            "0.1.0".to_string(),
194        );
195
196        assert!(info.has_role(NodeRole::Gateway));
197        assert!(info.has_role(NodeRole::Worker));
198        assert!(!info.has_role(NodeRole::Scheduler));
199        assert!(info.has_capability("general"));
200        assert!(!info.has_capability("media"));
201    }
202
203    #[test]
204    fn test_node_load_calculation() {
205        let mut info = NodeInfo::new_local(
206            "test".to_string(),
207            IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
208            8080,
209            9000,
210            vec![],
211            vec![],
212            "0.1.0".to_string(),
213        );
214        info.cpu_usage = 50.0;
215        info.memory_usage = 30.0;
216        assert!((info.load() - 0.4).abs() < 0.001);
217    }
218}