Skip to main content

aegis_replication/
node.rs

1//! Aegis Replication Node
2//!
3//! Node identification and management for distributed clusters.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::net::SocketAddr;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12// =============================================================================
13// Node ID
14// =============================================================================
15
16/// Unique identifier for a node in the cluster.
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct NodeId(pub String);
19
20impl NodeId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn generate() -> Self {
26        let timestamp = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_nanos();
30        Self(format!("node_{:016x}", timestamp as u64))
31    }
32
33    pub fn as_str(&self) -> &str {
34        &self.0
35    }
36}
37
38impl std::fmt::Display for NodeId {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl From<String> for NodeId {
45    fn from(s: String) -> Self {
46        Self(s)
47    }
48}
49
50impl From<&str> for NodeId {
51    fn from(s: &str) -> Self {
52        Self(s.to_string())
53    }
54}
55
56// =============================================================================
57// Node Status
58// =============================================================================
59
60/// Status of a node in the cluster.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
62pub enum NodeStatus {
63    /// Node is starting up.
64    #[default]
65    Starting,
66    /// Node is healthy and operational.
67    Healthy,
68    /// Node is suspected to be failing.
69    Suspect,
70    /// Node is confirmed down.
71    Down,
72    /// Node is being removed from cluster.
73    Leaving,
74    /// Node has left the cluster.
75    Left,
76}
77
78// =============================================================================
79// Node Role
80// =============================================================================
81
82/// Role of a node in the Raft cluster.
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
84pub enum NodeRole {
85    /// Follower node that replicates from leader.
86    #[default]
87    Follower,
88    /// Candidate node during leader election.
89    Candidate,
90    /// Leader node that handles writes.
91    Leader,
92}
93
94// =============================================================================
95// Node Info
96// =============================================================================
97
98/// Information about a node in the cluster.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct NodeInfo {
101    pub id: NodeId,
102    pub address: String,
103    pub port: u16,
104    pub status: NodeStatus,
105    pub role: NodeRole,
106    pub joined_at: u64,
107    pub last_heartbeat: u64,
108    pub metadata: NodeMetadata,
109}
110
111impl NodeInfo {
112    /// Create a new node info.
113    pub fn new(id: impl Into<NodeId>, address: impl Into<String>, port: u16) -> Self {
114        let now = current_timestamp();
115        Self {
116            id: id.into(),
117            address: address.into(),
118            port,
119            status: NodeStatus::Starting,
120            role: NodeRole::Follower,
121            joined_at: now,
122            last_heartbeat: now,
123            metadata: NodeMetadata::default(),
124        }
125    }
126
127    /// Get the socket address.
128    pub fn socket_addr(&self) -> Option<SocketAddr> {
129        format!("{}:{}", self.address, self.port).parse().ok()
130    }
131
132    /// Update the heartbeat timestamp.
133    pub fn heartbeat(&mut self) {
134        self.last_heartbeat = current_timestamp();
135        if self.status == NodeStatus::Suspect {
136            self.status = NodeStatus::Healthy;
137        }
138    }
139
140    /// Mark the node as healthy.
141    pub fn mark_healthy(&mut self) {
142        self.status = NodeStatus::Healthy;
143        self.heartbeat();
144    }
145
146    /// Mark the node as suspect.
147    pub fn mark_suspect(&mut self) {
148        if self.status == NodeStatus::Healthy {
149            self.status = NodeStatus::Suspect;
150        }
151    }
152
153    /// Mark the node as down.
154    pub fn mark_down(&mut self) {
155        self.status = NodeStatus::Down;
156    }
157
158    /// Check if the node is available for requests.
159    pub fn is_available(&self) -> bool {
160        matches!(self.status, NodeStatus::Healthy)
161    }
162
163    /// Check if the node is the leader.
164    pub fn is_leader(&self) -> bool {
165        self.role == NodeRole::Leader
166    }
167
168    /// Time since last heartbeat in milliseconds.
169    pub fn heartbeat_age(&self) -> u64 {
170        current_timestamp().saturating_sub(self.last_heartbeat)
171    }
172}
173
174// =============================================================================
175// Node Metadata
176// =============================================================================
177
178/// Additional metadata about a node.
179#[derive(Debug, Clone, Default, Serialize, Deserialize)]
180pub struct NodeMetadata {
181    pub datacenter: Option<String>,
182    pub rack: Option<String>,
183    pub zone: Option<String>,
184    pub tags: Vec<String>,
185    pub capacity: Option<NodeCapacity>,
186}
187
188impl NodeMetadata {
189    pub fn with_datacenter(mut self, dc: impl Into<String>) -> Self {
190        self.datacenter = Some(dc.into());
191        self
192    }
193
194    pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
195        self.rack = Some(rack.into());
196        self
197    }
198
199    pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
200        self.zone = Some(zone.into());
201        self
202    }
203
204    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
205        self.tags.push(tag.into());
206        self
207    }
208}
209
210// =============================================================================
211// Node Capacity
212// =============================================================================
213
214/// Capacity information for a node.
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct NodeCapacity {
217    pub cpu_cores: u32,
218    pub memory_mb: u64,
219    pub storage_gb: u64,
220    pub network_mbps: u32,
221}
222
223impl NodeCapacity {
224    pub fn new(cpu_cores: u32, memory_mb: u64, storage_gb: u64) -> Self {
225        Self {
226            cpu_cores,
227            memory_mb,
228            storage_gb,
229            network_mbps: 1000,
230        }
231    }
232}
233
234// =============================================================================
235// Node Health
236// =============================================================================
237
238/// Health check result for a node.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct NodeHealth {
241    pub node_id: NodeId,
242    pub healthy: bool,
243    pub cpu_usage: f64,
244    pub memory_usage: f64,
245    pub disk_usage: f64,
246    pub latency_ms: u64,
247    pub checked_at: u64,
248}
249
250impl NodeHealth {
251    pub fn healthy(node_id: NodeId) -> Self {
252        Self {
253            node_id,
254            healthy: true,
255            cpu_usage: 0.0,
256            memory_usage: 0.0,
257            disk_usage: 0.0,
258            latency_ms: 0,
259            checked_at: current_timestamp(),
260        }
261    }
262
263    pub fn unhealthy(node_id: NodeId) -> Self {
264        Self {
265            node_id,
266            healthy: false,
267            cpu_usage: 0.0,
268            memory_usage: 0.0,
269            disk_usage: 0.0,
270            latency_ms: 0,
271            checked_at: current_timestamp(),
272        }
273    }
274}
275
276fn current_timestamp() -> u64 {
277    SystemTime::now()
278        .duration_since(UNIX_EPOCH)
279        .map(|d| d.as_millis() as u64)
280        .unwrap_or(0)
281}
282
283// =============================================================================
284// Tests
285// =============================================================================
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_node_id() {
293        let id1 = NodeId::generate();
294        let id2 = NodeId::generate();
295        assert_ne!(id1, id2);
296        assert!(id1.as_str().starts_with("node_"));
297    }
298
299    #[test]
300    fn test_node_info() {
301        let mut node = NodeInfo::new("node1", "127.0.0.1", 5432);
302
303        assert_eq!(node.status, NodeStatus::Starting);
304        assert_eq!(node.role, NodeRole::Follower);
305
306        node.mark_healthy();
307        assert_eq!(node.status, NodeStatus::Healthy);
308        assert!(node.is_available());
309
310        node.mark_suspect();
311        assert_eq!(node.status, NodeStatus::Suspect);
312
313        node.heartbeat();
314        assert_eq!(node.status, NodeStatus::Healthy);
315    }
316
317    #[test]
318    fn test_socket_addr() {
319        let node = NodeInfo::new("node1", "127.0.0.1", 5432);
320        let addr = node.socket_addr().unwrap();
321        assert_eq!(addr.port(), 5432);
322    }
323
324    #[test]
325    fn test_node_metadata() {
326        let metadata = NodeMetadata::default()
327            .with_datacenter("us-east-1")
328            .with_zone("a")
329            .with_tag("production");
330
331        assert_eq!(metadata.datacenter, Some("us-east-1".to_string()));
332        assert_eq!(metadata.zone, Some("a".to_string()));
333        assert!(metadata.tags.contains(&"production".to_string()));
334    }
335
336    #[test]
337    fn test_node_health() {
338        let health = NodeHealth::healthy(NodeId::new("node1"));
339        assert!(health.healthy);
340
341        let unhealthy = NodeHealth::unhealthy(NodeId::new("node2"));
342        assert!(!unhealthy.healthy);
343    }
344}