1use crate::scenario_orchestrator::OrchestratedScenario;
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::mpsc;
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
17pub struct Node {
18 pub id: String,
19 pub address: String,
20 pub region: Option<String>,
21 pub zone: Option<String>,
22 pub capabilities: Vec<String>,
23 pub last_heartbeat: DateTime<Utc>,
24 pub status: NodeStatus,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29#[serde(rename_all = "lowercase")]
30pub enum NodeStatus {
31 Active,
32 Inactive,
33 Degraded,
34 Failed,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct LeaderState {
40 pub leader_id: Option<String>,
41 pub term: u64,
42 pub elected_at: Option<DateTime<Utc>>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct DistributedTask {
48 pub id: String,
49 pub orchestration: OrchestratedScenario,
50 pub target_nodes: Vec<String>,
51 pub coordination_mode: CoordinationMode,
52 pub created_at: DateTime<Utc>,
53 pub started_at: Option<DateTime<Utc>>,
54 pub completed_at: Option<DateTime<Utc>>,
55 pub status: TaskStatus,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
60#[serde(rename_all = "lowercase")]
61pub enum CoordinationMode {
62 Parallel,
64 Sequential,
66 LeaderAssigned,
68 PeerToPeer,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74#[serde(rename_all = "lowercase")]
75pub enum TaskStatus {
76 Pending,
77 Running,
78 Completed,
79 Failed,
80 Cancelled,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct NodeExecutionState {
86 pub node_id: String,
87 pub task_id: String,
88 pub status: TaskStatus,
89 pub progress: f64,
90 pub started_at: Option<DateTime<Utc>>,
91 pub completed_at: Option<DateTime<Utc>>,
92 pub error: Option<String>,
93 pub metrics: ExecutionMetrics,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, Default)]
98pub struct ExecutionMetrics {
99 pub steps_completed: usize,
100 pub steps_total: usize,
101 pub events_generated: usize,
102 pub errors: usize,
103 pub avg_latency_ms: f64,
104}
105
106pub struct DistributedCoordinator {
108 node_id: String,
110 nodes: Arc<RwLock<HashMap<String, Node>>>,
112 leader_state: Arc<RwLock<LeaderState>>,
114 tasks: Arc<RwLock<HashMap<String, DistributedTask>>>,
116 execution_states: Arc<RwLock<HashMap<String, NodeExecutionState>>>,
118 control_tx: Option<mpsc::Sender<CoordinatorControl>>,
120}
121
122enum CoordinatorControl {
124 RegisterNode(Node),
125 UnregisterNode(String),
126 SubmitTask(DistributedTask),
127 Heartbeat(String),
128 TriggerElection,
129}
130
131impl DistributedCoordinator {
132 pub fn new(node_id: impl Into<String>) -> Self {
134 Self {
135 node_id: node_id.into(),
136 nodes: Arc::new(RwLock::new(HashMap::new())),
137 leader_state: Arc::new(RwLock::new(LeaderState {
138 leader_id: None,
139 term: 0,
140 elected_at: None,
141 })),
142 tasks: Arc::new(RwLock::new(HashMap::new())),
143 execution_states: Arc::new(RwLock::new(HashMap::new())),
144 control_tx: None,
145 }
146 }
147
148 pub async fn start(&mut self) -> Result<(), String> {
150 info!("Starting distributed coordinator for node {}", self.node_id);
151
152 let (control_tx, mut control_rx) = mpsc::channel::<CoordinatorControl>(100);
154 self.control_tx = Some(control_tx);
155
156 let node_id = self.node_id.clone();
158 let nodes = Arc::clone(&self.nodes);
159 let leader_state = Arc::clone(&self.leader_state);
160 let tasks = Arc::clone(&self.tasks);
161 let execution_states = Arc::clone(&self.execution_states);
162
163 tokio::spawn(async move {
165 Self::coordinator_task(
166 node_id,
167 nodes,
168 leader_state,
169 tasks,
170 execution_states,
171 &mut control_rx,
172 )
173 .await;
174 });
175
176 Ok(())
177 }
178
179 async fn coordinator_task(
181 node_id: String,
182 nodes: Arc<RwLock<HashMap<String, Node>>>,
183 leader_state: Arc<RwLock<LeaderState>>,
184 tasks: Arc<RwLock<HashMap<String, DistributedTask>>>,
185 _execution_states: Arc<RwLock<HashMap<String, NodeExecutionState>>>,
186 control_rx: &mut mpsc::Receiver<CoordinatorControl>,
187 ) {
188 loop {
189 tokio::select! {
190 Some(cmd) = control_rx.recv() => {
191 match cmd {
192 CoordinatorControl::RegisterNode(node) => {
193 info!("Registering node: {}", node.id);
194 let mut nodes_guard = nodes.write();
195 nodes_guard.insert(node.id.clone(), node);
196 }
197 CoordinatorControl::UnregisterNode(id) => {
198 info!("Unregistering node: {}", id);
199 let mut nodes_guard = nodes.write();
200 nodes_guard.remove(&id);
201 }
202 CoordinatorControl::SubmitTask(task) => {
203 info!("Submitting task: {}", task.id);
204 let mut tasks_guard = tasks.write();
205 tasks_guard.insert(task.id.clone(), task);
206 }
207 CoordinatorControl::Heartbeat(node_id) => {
208 debug!("Heartbeat from node: {}", node_id);
209 let mut nodes_guard = nodes.write();
210 if let Some(node) = nodes_guard.get_mut(&node_id) {
211 node.last_heartbeat = Utc::now();
212 }
213 }
214 CoordinatorControl::TriggerElection => {
215 info!("Triggering leader election");
216 Self::elect_leader(&node_id, &nodes, &leader_state);
217 }
218 }
219 }
220 _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => {
221 Self::check_node_health(&nodes);
223
224 let needs_election = {
226 let state = leader_state.read();
227 state.leader_id.is_none()
228 };
229
230 if needs_election {
231 Self::elect_leader(&node_id, &nodes, &leader_state);
232 }
233 }
234 }
235 }
236 }
237
238 pub async fn register_node(&self, node: Node) -> Result<(), String> {
240 if let Some(ref tx) = self.control_tx {
241 tx.send(CoordinatorControl::RegisterNode(node))
242 .await
243 .map_err(|e| format!("Failed to register node: {}", e))?;
244 Ok(())
245 } else {
246 Err("Coordinator not started".to_string())
247 }
248 }
249
250 pub async fn unregister_node(&self, node_id: &str) -> Result<(), String> {
252 if let Some(ref tx) = self.control_tx {
253 tx.send(CoordinatorControl::UnregisterNode(node_id.to_string()))
254 .await
255 .map_err(|e| format!("Failed to unregister node: {}", e))?;
256 Ok(())
257 } else {
258 Err("Coordinator not started".to_string())
259 }
260 }
261
262 pub async fn submit_task(&self, task: DistributedTask) -> Result<(), String> {
264 if let Some(ref tx) = self.control_tx {
265 tx.send(CoordinatorControl::SubmitTask(task))
266 .await
267 .map_err(|e| format!("Failed to submit task: {}", e))?;
268 Ok(())
269 } else {
270 Err("Coordinator not started".to_string())
271 }
272 }
273
274 pub async fn heartbeat(&self) -> Result<(), String> {
276 if let Some(ref tx) = self.control_tx {
277 tx.send(CoordinatorControl::Heartbeat(self.node_id.clone()))
278 .await
279 .map_err(|e| format!("Failed to send heartbeat: {}", e))?;
280 Ok(())
281 } else {
282 Err("Coordinator not started".to_string())
283 }
284 }
285
286 pub async fn trigger_election(&self) -> Result<(), String> {
288 if let Some(ref tx) = self.control_tx {
289 tx.send(CoordinatorControl::TriggerElection)
290 .await
291 .map_err(|e| format!("Failed to trigger election: {}", e))?;
292 Ok(())
293 } else {
294 Err("Coordinator not started".to_string())
295 }
296 }
297
298 fn elect_leader(
300 _current_node_id: &str,
301 nodes: &Arc<RwLock<HashMap<String, Node>>>,
302 leader_state: &Arc<RwLock<LeaderState>>,
303 ) {
304 let nodes_guard = nodes.read();
305
306 let active_nodes: Vec<_> =
308 nodes_guard.values().filter(|n| n.status == NodeStatus::Active).collect();
309
310 if active_nodes.is_empty() {
311 warn!("No active nodes for leader election");
312 return;
313 }
314
315 let leader = active_nodes.iter().min_by(|a, b| a.id.cmp(&b.id)).unwrap();
317
318 let mut state = leader_state.write();
319 state.leader_id = Some(leader.id.clone());
320 state.term += 1;
321 state.elected_at = Some(Utc::now());
322
323 info!("Leader elected: {} (term {})", leader.id, state.term);
324 }
325
326 fn check_node_health(nodes: &Arc<RwLock<HashMap<String, Node>>>) {
328 let mut nodes_guard = nodes.write();
329 let now = Utc::now();
330 let timeout = chrono::Duration::seconds(30);
331
332 for node in nodes_guard.values_mut() {
333 if node.status == NodeStatus::Active {
334 let since_heartbeat = now - node.last_heartbeat;
335 if since_heartbeat > timeout {
336 warn!("Node {} missed heartbeat", node.id);
337 node.status = NodeStatus::Degraded;
338 }
339 if since_heartbeat > timeout * 2 {
340 warn!("Node {} failed (no heartbeat)", node.id);
341 node.status = NodeStatus::Failed;
342 }
343 }
344 }
345 }
346
347 pub fn get_leader(&self) -> Option<String> {
349 let state = self.leader_state.read();
350 state.leader_id.clone()
351 }
352
353 pub fn is_leader(&self) -> bool {
355 let state = self.leader_state.read();
356 state.leader_id.as_ref() == Some(&self.node_id)
357 }
358
359 pub fn get_nodes(&self) -> Vec<Node> {
361 let nodes = self.nodes.read();
362 nodes.values().cloned().collect()
363 }
364
365 pub fn get_active_nodes(&self) -> Vec<Node> {
367 let nodes = self.nodes.read();
368 nodes.values().filter(|n| n.status == NodeStatus::Active).cloned().collect()
369 }
370
371 pub fn get_task(&self, task_id: &str) -> Option<DistributedTask> {
373 let tasks = self.tasks.read();
374 tasks.get(task_id).cloned()
375 }
376
377 pub fn get_tasks(&self) -> Vec<DistributedTask> {
379 let tasks = self.tasks.read();
380 tasks.values().cloned().collect()
381 }
382
383 pub fn get_task_execution_states(&self, task_id: &str) -> Vec<NodeExecutionState> {
385 let states = self.execution_states.read();
386 states
387 .iter()
388 .filter(|(k, _)| k.starts_with(&format!("{}:", task_id)))
389 .map(|(_, v)| v.clone())
390 .collect()
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn test_coordinator_creation() {
400 let coordinator = DistributedCoordinator::new("node-1");
401 assert_eq!(coordinator.node_id, "node-1");
402 assert!(!coordinator.is_leader());
403 }
404
405 #[test]
406 fn test_node_status() {
407 let node = Node {
408 id: "node-1".to_string(),
409 address: "127.0.0.1:8080".to_string(),
410 region: Some("us-east-1".to_string()),
411 zone: Some("us-east-1a".to_string()),
412 capabilities: vec!["chaos".to_string()],
413 last_heartbeat: Utc::now(),
414 status: NodeStatus::Active,
415 };
416
417 assert_eq!(node.status, NodeStatus::Active);
418 }
419
420 #[tokio::test]
421 async fn test_coordinator_start() {
422 let mut coordinator = DistributedCoordinator::new("node-1");
423 assert!(coordinator.start().await.is_ok());
424 }
425
426 #[tokio::test]
427 async fn test_register_node() {
428 let mut coordinator = DistributedCoordinator::new("node-1");
429 coordinator.start().await.unwrap();
430
431 let node = Node {
432 id: "node-2".to_string(),
433 address: "127.0.0.1:8081".to_string(),
434 region: None,
435 zone: None,
436 capabilities: vec![],
437 last_heartbeat: Utc::now(),
438 status: NodeStatus::Active,
439 };
440
441 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
443
444 assert!(coordinator.register_node(node).await.is_ok());
445
446 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
448
449 let nodes = coordinator.get_nodes();
450 assert!(nodes.iter().any(|n| n.id == "node-2"));
451 }
452}