aegis_replication/
node.rs

1//! Aegis Replication Node
2//!
3//! Node identification and management for distributed clusters.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::net::SocketAddr;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12// =============================================================================
13// Node ID
14// =============================================================================
15
16/// Unique identifier for a node in the cluster.
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct NodeId(pub String);
19
20impl NodeId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn generate() -> Self {
26        let timestamp = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_nanos();
30        Self(format!("node_{:016x}", timestamp as u64))
31    }
32
33    pub fn as_str(&self) -> &str {
34        &self.0
35    }
36}
37
38impl std::fmt::Display for NodeId {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl From<String> for NodeId {
45    fn from(s: String) -> Self {
46        Self(s)
47    }
48}
49
50impl From<&str> for NodeId {
51    fn from(s: &str) -> Self {
52        Self(s.to_string())
53    }
54}
55
56// =============================================================================
57// Node Status
58// =============================================================================
59
60/// Status of a node in the cluster.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum NodeStatus {
63    /// Node is starting up.
64    Starting,
65    /// Node is healthy and operational.
66    Healthy,
67    /// Node is suspected to be failing.
68    Suspect,
69    /// Node is confirmed down.
70    Down,
71    /// Node is being removed from cluster.
72    Leaving,
73    /// Node has left the cluster.
74    Left,
75}
76
77impl Default for NodeStatus {
78    fn default() -> Self {
79        Self::Starting
80    }
81}
82
83// =============================================================================
84// Node Role
85// =============================================================================
86
87/// Role of a node in the Raft cluster.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
89pub enum NodeRole {
90    /// Follower node that replicates from leader.
91    Follower,
92    /// Candidate node during leader election.
93    Candidate,
94    /// Leader node that handles writes.
95    Leader,
96}
97
98impl Default for NodeRole {
99    fn default() -> Self {
100        Self::Follower
101    }
102}
103
104// =============================================================================
105// Node Info
106// =============================================================================
107
108/// Information about a node in the cluster.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct NodeInfo {
111    pub id: NodeId,
112    pub address: String,
113    pub port: u16,
114    pub status: NodeStatus,
115    pub role: NodeRole,
116    pub joined_at: u64,
117    pub last_heartbeat: u64,
118    pub metadata: NodeMetadata,
119}
120
121impl NodeInfo {
122    /// Create a new node info.
123    pub fn new(id: impl Into<NodeId>, address: impl Into<String>, port: u16) -> Self {
124        let now = current_timestamp();
125        Self {
126            id: id.into(),
127            address: address.into(),
128            port,
129            status: NodeStatus::Starting,
130            role: NodeRole::Follower,
131            joined_at: now,
132            last_heartbeat: now,
133            metadata: NodeMetadata::default(),
134        }
135    }
136
137    /// Get the socket address.
138    pub fn socket_addr(&self) -> Option<SocketAddr> {
139        format!("{}:{}", self.address, self.port).parse().ok()
140    }
141
142    /// Update the heartbeat timestamp.
143    pub fn heartbeat(&mut self) {
144        self.last_heartbeat = current_timestamp();
145        if self.status == NodeStatus::Suspect {
146            self.status = NodeStatus::Healthy;
147        }
148    }
149
150    /// Mark the node as healthy.
151    pub fn mark_healthy(&mut self) {
152        self.status = NodeStatus::Healthy;
153        self.heartbeat();
154    }
155
156    /// Mark the node as suspect.
157    pub fn mark_suspect(&mut self) {
158        if self.status == NodeStatus::Healthy {
159            self.status = NodeStatus::Suspect;
160        }
161    }
162
163    /// Mark the node as down.
164    pub fn mark_down(&mut self) {
165        self.status = NodeStatus::Down;
166    }
167
168    /// Check if the node is available for requests.
169    pub fn is_available(&self) -> bool {
170        matches!(self.status, NodeStatus::Healthy)
171    }
172
173    /// Check if the node is the leader.
174    pub fn is_leader(&self) -> bool {
175        self.role == NodeRole::Leader
176    }
177
178    /// Time since last heartbeat in milliseconds.
179    pub fn heartbeat_age(&self) -> u64 {
180        current_timestamp().saturating_sub(self.last_heartbeat)
181    }
182}
183
184// =============================================================================
185// Node Metadata
186// =============================================================================
187
188/// Additional metadata about a node.
189#[derive(Debug, Clone, Default, Serialize, Deserialize)]
190pub struct NodeMetadata {
191    pub datacenter: Option<String>,
192    pub rack: Option<String>,
193    pub zone: Option<String>,
194    pub tags: Vec<String>,
195    pub capacity: Option<NodeCapacity>,
196}
197
198impl NodeMetadata {
199    pub fn with_datacenter(mut self, dc: impl Into<String>) -> Self {
200        self.datacenter = Some(dc.into());
201        self
202    }
203
204    pub fn with_rack(mut self, rack: impl Into<String>) -> Self {
205        self.rack = Some(rack.into());
206        self
207    }
208
209    pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
210        self.zone = Some(zone.into());
211        self
212    }
213
214    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
215        self.tags.push(tag.into());
216        self
217    }
218}
219
220// =============================================================================
221// Node Capacity
222// =============================================================================
223
224/// Capacity information for a node.
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct NodeCapacity {
227    pub cpu_cores: u32,
228    pub memory_mb: u64,
229    pub storage_gb: u64,
230    pub network_mbps: u32,
231}
232
233impl NodeCapacity {
234    pub fn new(cpu_cores: u32, memory_mb: u64, storage_gb: u64) -> Self {
235        Self {
236            cpu_cores,
237            memory_mb,
238            storage_gb,
239            network_mbps: 1000,
240        }
241    }
242}
243
244// =============================================================================
245// Node Health
246// =============================================================================
247
248/// Health check result for a node.
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct NodeHealth {
251    pub node_id: NodeId,
252    pub healthy: bool,
253    pub cpu_usage: f64,
254    pub memory_usage: f64,
255    pub disk_usage: f64,
256    pub latency_ms: u64,
257    pub checked_at: u64,
258}
259
260impl NodeHealth {
261    pub fn healthy(node_id: NodeId) -> Self {
262        Self {
263            node_id,
264            healthy: true,
265            cpu_usage: 0.0,
266            memory_usage: 0.0,
267            disk_usage: 0.0,
268            latency_ms: 0,
269            checked_at: current_timestamp(),
270        }
271    }
272
273    pub fn unhealthy(node_id: NodeId) -> Self {
274        Self {
275            node_id,
276            healthy: false,
277            cpu_usage: 0.0,
278            memory_usage: 0.0,
279            disk_usage: 0.0,
280            latency_ms: 0,
281            checked_at: current_timestamp(),
282        }
283    }
284}
285
286fn current_timestamp() -> u64 {
287    SystemTime::now()
288        .duration_since(UNIX_EPOCH)
289        .map(|d| d.as_millis() as u64)
290        .unwrap_or(0)
291}
292
293// =============================================================================
294// Tests
295// =============================================================================
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_node_id() {
303        let id1 = NodeId::generate();
304        let id2 = NodeId::generate();
305        assert_ne!(id1, id2);
306        assert!(id1.as_str().starts_with("node_"));
307    }
308
309    #[test]
310    fn test_node_info() {
311        let mut node = NodeInfo::new("node1", "127.0.0.1", 5432);
312
313        assert_eq!(node.status, NodeStatus::Starting);
314        assert_eq!(node.role, NodeRole::Follower);
315
316        node.mark_healthy();
317        assert_eq!(node.status, NodeStatus::Healthy);
318        assert!(node.is_available());
319
320        node.mark_suspect();
321        assert_eq!(node.status, NodeStatus::Suspect);
322
323        node.heartbeat();
324        assert_eq!(node.status, NodeStatus::Healthy);
325    }
326
327    #[test]
328    fn test_socket_addr() {
329        let node = NodeInfo::new("node1", "127.0.0.1", 5432);
330        let addr = node.socket_addr().unwrap();
331        assert_eq!(addr.port(), 5432);
332    }
333
334    #[test]
335    fn test_node_metadata() {
336        let metadata = NodeMetadata::default()
337            .with_datacenter("us-east-1")
338            .with_zone("a")
339            .with_tag("production");
340
341        assert_eq!(metadata.datacenter, Some("us-east-1".to_string()));
342        assert_eq!(metadata.zone, Some("a".to_string()));
343        assert!(metadata.tags.contains(&"production".to_string()));
344    }
345
346    #[test]
347    fn test_node_health() {
348        let health = NodeHealth::healthy(NodeId::new("node1"));
349        assert!(health.healthy);
350
351        let unhealthy = NodeHealth::unhealthy(NodeId::new("node2"));
352        assert!(!unhealthy.healthy);
353    }
354}