Skip to main content

aegis_replication/
node.rs

1//! Aegis Replication Node
2//!
3//! Node identification and management for distributed clusters.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::net::SocketAddr;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12// =============================================================================
13// Node ID
14// =============================================================================
15
16/// Unique identifier for a node in the cluster.
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct NodeId(pub String);
19
20impl NodeId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn generate() -> Self {
26        let timestamp = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_nanos();
30        Self(format!("node_{:016x}", timestamp as u64))
31    }
32
33    pub fn as_str(&self) -> &str {
34        &self.0
35    }
36}
37
38impl std::fmt::Display for NodeId {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl From<String> for NodeId {
45    fn from(s: String) -> Self {
46        Self(s)
47    }
48}
49
50impl From<&str> for NodeId {
51    fn from(s: &str) -> Self {
52        Self(s.to_string())
53    }
54}
55
56// =============================================================================
57// Node Status
58// =============================================================================
59
60/// Status of a node in the cluster.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62#[derive(Default)]
63pub enum NodeStatus {
64    /// Node is starting up.
65    #[default]
66    Starting,
67    /// Node is healthy and operational.
68    Healthy,
69    /// Node is suspected to be failing.
70    Suspect,
71    /// Node is confirmed down.
72    Down,
73    /// Node is being removed from cluster.
74    Leaving,
75    /// Node has left the cluster.
76    Left,
77}
78
79
80// =============================================================================
81// Node Role
82// =============================================================================
83
84/// Role of a node in the Raft cluster.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[derive(Default)]
87pub enum NodeRole {
88    /// Follower node that replicates from leader.
89    #[default]
90    Follower,
91    /// Candidate node during leader election.
92    Candidate,
93    /// Leader node that handles writes.
94    Leader,
95}
96
97
98// =============================================================================
99// Node Info
100// =============================================================================
101
102/// Information about a node in the cluster.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct NodeInfo {
105    pub id: NodeId,
106    pub address: String,
107    pub port: u16,
108    pub status: NodeStatus,
109    pub role: NodeRole,
110    pub joined_at: u64,
111    pub last_heartbeat: u64,
112    pub metadata: NodeMetadata,
113}
114
115impl NodeInfo {
116    /// Create a new node info.
117    pub fn new(id: impl Into<NodeId>, address: impl Into<String>, port: u16) -> Self {
118        let now = current_timestamp();
119        Self {
120            id: id.into(),
121            address: address.into(),
122            port,
123            status: NodeStatus::Starting,
124            role: NodeRole::Follower,
125            joined_at: now,
126            last_heartbeat: now,
127            metadata: NodeMetadata::default(),
128        }
129    }
130
131    /// Get the socket address.
132    pub fn socket_addr(&self) -> Option<SocketAddr> {
133        format!("{}:{}", self.address, self.port).parse().ok()
134    }
135
136    /// Update the heartbeat timestamp.
137    pub fn heartbeat(&mut self) {
138        self.last_heartbeat = current_timestamp();
139        if self.status == NodeStatus::Suspect {
140            self.status = NodeStatus::Healthy;
141        }
142    }
143
144    /// Mark the node as healthy.
145    pub fn mark_healthy(&mut self) {
146        self.status = NodeStatus::Healthy;
147        self.heartbeat();
148    }
149
150    /// Mark the node as suspect.
151    pub fn mark_suspect(&mut self) {
152        if self.status == NodeStatus::Healthy {
153            self.status = NodeStatus::Suspect;
154        }
155    }
156
157    /// Mark the node as down.
158    pub fn mark_down(&mut self) {
159        self.status = NodeStatus::Down;
160    }
161
162    /// Check if the node is available for requests.
163    pub fn is_available(&self) -> bool {
164        matches!(self.status, NodeStatus::Healthy)
165    }
166
167    /// Check if the node is the leader.
168    pub fn is_leader(&self) -> bool {
169        self.role == NodeRole::Leader
170    }
171
172    /// Time since last heartbeat in milliseconds.
173    pub fn heartbeat_age(&self) -> u64 {
174        current_timestamp().saturating_sub(self.last_heartbeat)
175    }
176}
177
178// =============================================================================
179// Node Metadata
180// =============================================================================
181
182/// Additional metadata about a node.
183#[derive(Debug, Clone, Default, Serialize, Deserialize)]
184pub struct NodeMetadata {
185    pub datacenter: Option<String>,
186    pub rack: Option<String>,
187    pub zone: Option<String>,
188    pub tags: Vec<String>,
189    pub capacity: Option<NodeCapacity>,
190}
191
192impl NodeMetadata {
193    pub fn with_datacenter(mut self, dc: impl Into<String>) -> Self {
194        self.datacenter = Some(dc.into());
195        self
196    }
197
198    pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
199        self.rack = Some(rack.into());
200        self
201    }
202
203    pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
204        self.zone = Some(zone.into());
205        self
206    }
207
208    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
209        self.tags.push(tag.into());
210        self
211    }
212}
213
214// =============================================================================
215// Node Capacity
216// =============================================================================
217
218/// Capacity information for a node.
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct NodeCapacity {
221    pub cpu_cores: u32,
222    pub memory_mb: u64,
223    pub storage_gb: u64,
224    pub network_mbps: u32,
225}
226
227impl NodeCapacity {
228    pub fn new(cpu_cores: u32, memory_mb: u64, storage_gb: u64) -> Self {
229        Self {
230            cpu_cores,
231            memory_mb,
232            storage_gb,
233            network_mbps: 1000,
234        }
235    }
236}
237
238// =============================================================================
239// Node Health
240// =============================================================================
241
242/// Health check result for a node.
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct NodeHealth {
245    pub node_id: NodeId,
246    pub healthy: bool,
247    pub cpu_usage: f64,
248    pub memory_usage: f64,
249    pub disk_usage: f64,
250    pub latency_ms: u64,
251    pub checked_at: u64,
252}
253
254impl NodeHealth {
255    pub fn healthy(node_id: NodeId) -> Self {
256        Self {
257            node_id,
258            healthy: true,
259            cpu_usage: 0.0,
260            memory_usage: 0.0,
261            disk_usage: 0.0,
262            latency_ms: 0,
263            checked_at: current_timestamp(),
264        }
265    }
266
267    pub fn unhealthy(node_id: NodeId) -> Self {
268        Self {
269            node_id,
270            healthy: false,
271            cpu_usage: 0.0,
272            memory_usage: 0.0,
273            disk_usage: 0.0,
274            latency_ms: 0,
275            checked_at: current_timestamp(),
276        }
277    }
278}
279
280fn current_timestamp() -> u64 {
281    SystemTime::now()
282        .duration_since(UNIX_EPOCH)
283        .map(|d| d.as_millis() as u64)
284        .unwrap_or(0)
285}
286
287// =============================================================================
288// Tests
289// =============================================================================
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn test_node_id() {
297        let id1 = NodeId::generate();
298        let id2 = NodeId::generate();
299        assert_ne!(id1, id2);
300        assert!(id1.as_str().starts_with("node_"));
301    }
302
303    #[test]
304    fn test_node_info() {
305        let mut node = NodeInfo::new("node1", "127.0.0.1", 5432);
306
307        assert_eq!(node.status, NodeStatus::Starting);
308        assert_eq!(node.role, NodeRole::Follower);
309
310        node.mark_healthy();
311        assert_eq!(node.status, NodeStatus::Healthy);
312        assert!(node.is_available());
313
314        node.mark_suspect();
315        assert_eq!(node.status, NodeStatus::Suspect);
316
317        node.heartbeat();
318        assert_eq!(node.status, NodeStatus::Healthy);
319    }
320
321    #[test]
322    fn test_socket_addr() {
323        let node = NodeInfo::new("node1", "127.0.0.1", 5432);
324        let addr = node.socket_addr().unwrap();
325        assert_eq!(addr.port(), 5432);
326    }
327
328    #[test]
329    fn test_node_metadata() {
330        let metadata = NodeMetadata::default()
331            .with_datacenter("us-east-1")
332            .with_zone("a")
333            .with_tag("production");
334
335        assert_eq!(metadata.datacenter, Some("us-east-1".to_string()));
336        assert_eq!(metadata.zone, Some("a".to_string()));
337        assert!(metadata.tags.contains(&"production".to_string()));
338    }
339
340    #[test]
341    fn test_node_health() {
342        let health = NodeHealth::healthy(NodeId::new("node1"));
343        assert!(health.healthy);
344
345        let unhealthy = NodeHealth::unhealthy(NodeId::new("node2"));
346        assert!(!unhealthy.healthy);
347    }
348}