mockforge_chaos/
distributed_coordinator.rs

1//! Distributed chaos coordination
2//!
3//! Coordinate chaos orchestrations across distributed systems with leader election,
4//! state synchronization, and distributed consensus.
5
6use crate::scenario_orchestrator::OrchestratedScenario;
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{debug, info, warn};
14
15/// Node in the distributed system
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub struct Node {
18    pub id: String,
19    pub address: String,
20    pub region: Option<String>,
21    pub zone: Option<String>,
22    pub capabilities: Vec<String>,
23    pub last_heartbeat: DateTime<Utc>,
24    pub status: NodeStatus,
25}
26
27/// Node status
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29#[serde(rename_all = "lowercase")]
30pub enum NodeStatus {
31    Active,
32    Inactive,
33    Degraded,
34    Failed,
35}
36
37/// Leader election state
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct LeaderState {
40    pub leader_id: Option<String>,
41    pub term: u64,
42    pub elected_at: Option<DateTime<Utc>>,
43}
44
45/// Distributed orchestration task
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct DistributedTask {
48    pub id: String,
49    pub orchestration: OrchestratedScenario,
50    pub target_nodes: Vec<String>,
51    pub coordination_mode: CoordinationMode,
52    pub created_at: DateTime<Utc>,
53    pub started_at: Option<DateTime<Utc>>,
54    pub completed_at: Option<DateTime<Utc>>,
55    pub status: TaskStatus,
56}
57
58/// Coordination mode for distributed execution
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
60#[serde(rename_all = "lowercase")]
61pub enum CoordinationMode {
62    /// All nodes execute in parallel
63    Parallel,
64    /// One node at a time
65    Sequential,
66    /// Leader assigns tasks
67    LeaderAssigned,
68    /// Nodes coordinate amongst themselves
69    PeerToPeer,
70}
71
72/// Task status
73#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74#[serde(rename_all = "lowercase")]
75pub enum TaskStatus {
76    Pending,
77    Running,
78    Completed,
79    Failed,
80    Cancelled,
81}
82
83/// Node execution state
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct NodeExecutionState {
86    pub node_id: String,
87    pub task_id: String,
88    pub status: TaskStatus,
89    pub progress: f64,
90    pub started_at: Option<DateTime<Utc>>,
91    pub completed_at: Option<DateTime<Utc>>,
92    pub error: Option<String>,
93    pub metrics: ExecutionMetrics,
94}
95
96/// Execution metrics for a node
97#[derive(Debug, Clone, Serialize, Deserialize, Default)]
98pub struct ExecutionMetrics {
99    pub steps_completed: usize,
100    pub steps_total: usize,
101    pub events_generated: usize,
102    pub errors: usize,
103    pub avg_latency_ms: f64,
104}
105
106/// Distributed coordinator
107pub struct DistributedCoordinator {
108    /// Current node ID
109    node_id: String,
110    /// Registered nodes
111    nodes: Arc<RwLock<HashMap<String, Node>>>,
112    /// Leader state
113    leader_state: Arc<RwLock<LeaderState>>,
114    /// Active tasks
115    tasks: Arc<RwLock<HashMap<String, DistributedTask>>>,
116    /// Node execution states
117    execution_states: Arc<RwLock<HashMap<String, NodeExecutionState>>>,
118    /// Control channel
119    control_tx: Option<mpsc::Sender<CoordinatorControl>>,
120}
121
122/// Coordinator control commands
123enum CoordinatorControl {
124    RegisterNode(Node),
125    UnregisterNode(String),
126    SubmitTask(DistributedTask),
127    Heartbeat(String),
128    TriggerElection,
129}
130
131impl DistributedCoordinator {
132    /// Create a new distributed coordinator
133    pub fn new(node_id: impl Into<String>) -> Self {
134        Self {
135            node_id: node_id.into(),
136            nodes: Arc::new(RwLock::new(HashMap::new())),
137            leader_state: Arc::new(RwLock::new(LeaderState {
138                leader_id: None,
139                term: 0,
140                elected_at: None,
141            })),
142            tasks: Arc::new(RwLock::new(HashMap::new())),
143            execution_states: Arc::new(RwLock::new(HashMap::new())),
144            control_tx: None,
145        }
146    }
147
148    /// Start the coordinator
149    pub async fn start(&mut self) -> Result<(), String> {
150        info!("Starting distributed coordinator for node {}", self.node_id);
151
152        // Create control channel
153        let (control_tx, mut control_rx) = mpsc::channel::<CoordinatorControl>(100);
154        self.control_tx = Some(control_tx);
155
156        // Clone Arc references for background task
157        let node_id = self.node_id.clone();
158        let nodes = Arc::clone(&self.nodes);
159        let leader_state = Arc::clone(&self.leader_state);
160        let tasks = Arc::clone(&self.tasks);
161        let execution_states = Arc::clone(&self.execution_states);
162
163        // Spawn coordinator task
164        tokio::spawn(async move {
165            Self::coordinator_task(
166                node_id,
167                nodes,
168                leader_state,
169                tasks,
170                execution_states,
171                &mut control_rx,
172            )
173            .await;
174        });
175
176        Ok(())
177    }
178
179    /// Coordinator background task
180    async fn coordinator_task(
181        node_id: String,
182        nodes: Arc<RwLock<HashMap<String, Node>>>,
183        leader_state: Arc<RwLock<LeaderState>>,
184        tasks: Arc<RwLock<HashMap<String, DistributedTask>>>,
185        _execution_states: Arc<RwLock<HashMap<String, NodeExecutionState>>>,
186        control_rx: &mut mpsc::Receiver<CoordinatorControl>,
187    ) {
188        loop {
189            tokio::select! {
190                Some(cmd) = control_rx.recv() => {
191                    match cmd {
192                        CoordinatorControl::RegisterNode(node) => {
193                            info!("Registering node: {}", node.id);
194                            let mut nodes_guard = nodes.write();
195                            nodes_guard.insert(node.id.clone(), node);
196                        }
197                        CoordinatorControl::UnregisterNode(id) => {
198                            info!("Unregistering node: {}", id);
199                            let mut nodes_guard = nodes.write();
200                            nodes_guard.remove(&id);
201                        }
202                        CoordinatorControl::SubmitTask(task) => {
203                            info!("Submitting task: {}", task.id);
204                            let mut tasks_guard = tasks.write();
205                            tasks_guard.insert(task.id.clone(), task);
206                        }
207                        CoordinatorControl::Heartbeat(node_id) => {
208                            debug!("Heartbeat from node: {}", node_id);
209                            let mut nodes_guard = nodes.write();
210                            if let Some(node) = nodes_guard.get_mut(&node_id) {
211                                node.last_heartbeat = Utc::now();
212                            }
213                        }
214                        CoordinatorControl::TriggerElection => {
215                            info!("Triggering leader election");
216                            Self::elect_leader(&node_id, &nodes, &leader_state);
217                        }
218                    }
219                }
220                _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => {
221                    // Periodic health checks
222                    Self::check_node_health(&nodes);
223
224                    // Check if leader election needed
225                    let needs_election = {
226                        let state = leader_state.read();
227                        state.leader_id.is_none()
228                    };
229
230                    if needs_election {
231                        Self::elect_leader(&node_id, &nodes, &leader_state);
232                    }
233                }
234            }
235        }
236    }
237
238    /// Register a node
239    pub async fn register_node(&self, node: Node) -> Result<(), String> {
240        if let Some(ref tx) = self.control_tx {
241            tx.send(CoordinatorControl::RegisterNode(node))
242                .await
243                .map_err(|e| format!("Failed to register node: {}", e))?;
244            Ok(())
245        } else {
246            Err("Coordinator not started".to_string())
247        }
248    }
249
250    /// Unregister a node
251    pub async fn unregister_node(&self, node_id: &str) -> Result<(), String> {
252        if let Some(ref tx) = self.control_tx {
253            tx.send(CoordinatorControl::UnregisterNode(node_id.to_string()))
254                .await
255                .map_err(|e| format!("Failed to unregister node: {}", e))?;
256            Ok(())
257        } else {
258            Err("Coordinator not started".to_string())
259        }
260    }
261
262    /// Submit a distributed task
263    pub async fn submit_task(&self, task: DistributedTask) -> Result<(), String> {
264        if let Some(ref tx) = self.control_tx {
265            tx.send(CoordinatorControl::SubmitTask(task))
266                .await
267                .map_err(|e| format!("Failed to submit task: {}", e))?;
268            Ok(())
269        } else {
270            Err("Coordinator not started".to_string())
271        }
272    }
273
274    /// Send heartbeat
275    pub async fn heartbeat(&self) -> Result<(), String> {
276        if let Some(ref tx) = self.control_tx {
277            tx.send(CoordinatorControl::Heartbeat(self.node_id.clone()))
278                .await
279                .map_err(|e| format!("Failed to send heartbeat: {}", e))?;
280            Ok(())
281        } else {
282            Err("Coordinator not started".to_string())
283        }
284    }
285
286    /// Trigger leader election
287    pub async fn trigger_election(&self) -> Result<(), String> {
288        if let Some(ref tx) = self.control_tx {
289            tx.send(CoordinatorControl::TriggerElection)
290                .await
291                .map_err(|e| format!("Failed to trigger election: {}", e))?;
292            Ok(())
293        } else {
294            Err("Coordinator not started".to_string())
295        }
296    }
297
298    /// Elect leader (simple implementation)
299    fn elect_leader(
300        _current_node_id: &str,
301        nodes: &Arc<RwLock<HashMap<String, Node>>>,
302        leader_state: &Arc<RwLock<LeaderState>>,
303    ) {
304        let nodes_guard = nodes.read();
305
306        // Find active nodes
307        let active_nodes: Vec<_> =
308            nodes_guard.values().filter(|n| n.status == NodeStatus::Active).collect();
309
310        if active_nodes.is_empty() {
311            warn!("No active nodes for leader election");
312            return;
313        }
314
315        // Simple election: node with lowest ID becomes leader
316        let leader = active_nodes.iter().min_by(|a, b| a.id.cmp(&b.id)).unwrap();
317
318        let mut state = leader_state.write();
319        state.leader_id = Some(leader.id.clone());
320        state.term += 1;
321        state.elected_at = Some(Utc::now());
322
323        info!("Leader elected: {} (term {})", leader.id, state.term);
324    }
325
326    /// Check node health
327    fn check_node_health(nodes: &Arc<RwLock<HashMap<String, Node>>>) {
328        let mut nodes_guard = nodes.write();
329        let now = Utc::now();
330        let timeout = chrono::Duration::seconds(30);
331
332        for node in nodes_guard.values_mut() {
333            if node.status == NodeStatus::Active {
334                let since_heartbeat = now - node.last_heartbeat;
335                if since_heartbeat > timeout {
336                    warn!("Node {} missed heartbeat", node.id);
337                    node.status = NodeStatus::Degraded;
338                }
339                if since_heartbeat > timeout * 2 {
340                    warn!("Node {} failed (no heartbeat)", node.id);
341                    node.status = NodeStatus::Failed;
342                }
343            }
344        }
345    }
346
347    /// Get current leader
348    pub fn get_leader(&self) -> Option<String> {
349        let state = self.leader_state.read();
350        state.leader_id.clone()
351    }
352
353    /// Check if this node is the leader
354    pub fn is_leader(&self) -> bool {
355        let state = self.leader_state.read();
356        state.leader_id.as_ref() == Some(&self.node_id)
357    }
358
359    /// Get all registered nodes
360    pub fn get_nodes(&self) -> Vec<Node> {
361        let nodes = self.nodes.read();
362        nodes.values().cloned().collect()
363    }
364
365    /// Get active nodes
366    pub fn get_active_nodes(&self) -> Vec<Node> {
367        let nodes = self.nodes.read();
368        nodes.values().filter(|n| n.status == NodeStatus::Active).cloned().collect()
369    }
370
371    /// Get task status
372    pub fn get_task(&self, task_id: &str) -> Option<DistributedTask> {
373        let tasks = self.tasks.read();
374        tasks.get(task_id).cloned()
375    }
376
377    /// Get all tasks
378    pub fn get_tasks(&self) -> Vec<DistributedTask> {
379        let tasks = self.tasks.read();
380        tasks.values().cloned().collect()
381    }
382
383    /// Get node execution states for a task
384    pub fn get_task_execution_states(&self, task_id: &str) -> Vec<NodeExecutionState> {
385        let states = self.execution_states.read();
386        states
387            .iter()
388            .filter(|(k, _)| k.starts_with(&format!("{}:", task_id)))
389            .map(|(_, v)| v.clone())
390            .collect()
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_coordinator_creation() {
400        let coordinator = DistributedCoordinator::new("node-1");
401        assert_eq!(coordinator.node_id, "node-1");
402        assert!(!coordinator.is_leader());
403    }
404
405    #[test]
406    fn test_node_status() {
407        let node = Node {
408            id: "node-1".to_string(),
409            address: "127.0.0.1:8080".to_string(),
410            region: Some("us-east-1".to_string()),
411            zone: Some("us-east-1a".to_string()),
412            capabilities: vec!["chaos".to_string()],
413            last_heartbeat: Utc::now(),
414            status: NodeStatus::Active,
415        };
416
417        assert_eq!(node.status, NodeStatus::Active);
418    }
419
420    #[tokio::test]
421    async fn test_coordinator_start() {
422        let mut coordinator = DistributedCoordinator::new("node-1");
423        assert!(coordinator.start().await.is_ok());
424    }
425
426    #[tokio::test]
427    async fn test_register_node() {
428        let mut coordinator = DistributedCoordinator::new("node-1");
429        coordinator.start().await.unwrap();
430
431        let node = Node {
432            id: "node-2".to_string(),
433            address: "127.0.0.1:8081".to_string(),
434            region: None,
435            zone: None,
436            capabilities: vec![],
437            last_heartbeat: Utc::now(),
438            status: NodeStatus::Active,
439        };
440
441        // Give the coordinator task time to start
442        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
443
444        assert!(coordinator.register_node(node).await.is_ok());
445
446        // Wait for registration to process
447        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
448
449        let nodes = coordinator.get_nodes();
450        assert!(nodes.iter().any(|n| n.id == "node-2"));
451    }
452}