use std::collections::{HashMap, VecDeque};
use std::time::Instant;
use super::config::{
ClusterConfig, CommunicationProtocol, DistributedComputingConfig, NodeResources,
};
pub struct DistributedCoordinator {
pub config: DistributedComputingConfig,
pub cluster_manager: ClusterManager,
pub communication_manager: CommunicationManager,
pub fault_tolerance_manager: FaultToleranceManager,
}
impl DistributedCoordinator {
#[must_use]
pub fn new(config: DistributedComputingConfig) -> Self {
Self {
config,
cluster_manager: ClusterManager::new(),
communication_manager: CommunicationManager::new(),
fault_tolerance_manager: FaultToleranceManager::new(),
}
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.config.enable_distributed
}
#[must_use]
pub fn cluster_size(&self) -> usize {
self.cluster_manager.active_nodes.len()
}
}
#[derive(Debug)]
pub struct ClusterManager {
pub config: ClusterConfig,
pub active_nodes: HashMap<String, ClusterNode>,
pub node_statistics: HashMap<String, NodeStatistics>,
}
impl ClusterManager {
#[must_use]
pub fn new() -> Self {
Self {
config: ClusterConfig::default(),
active_nodes: HashMap::new(),
node_statistics: HashMap::new(),
}
}
pub fn add_node(&mut self, address: String, resources: NodeResources) {
let node = ClusterNode {
address: address.clone(),
resources,
status: NodeStatus::Active,
current_workload: NodeWorkload::default(),
};
self.active_nodes.insert(address.clone(), node);
self.node_statistics
.insert(address, NodeStatistics::default());
}
pub fn remove_node(&mut self, address: &str) -> Option<ClusterNode> {
self.node_statistics.remove(address);
self.active_nodes.remove(address)
}
#[must_use]
pub fn available_nodes(&self) -> Vec<&ClusterNode> {
self.active_nodes
.values()
.filter(|n| n.status == NodeStatus::Active)
.collect()
}
pub fn update_node_status(&mut self, address: &str, status: NodeStatus) {
if let Some(node) = self.active_nodes.get_mut(address) {
node.status = status;
}
}
#[must_use]
pub fn get_node(&self, address: &str) -> Option<&ClusterNode> {
self.active_nodes.get(address)
}
}
impl Default for ClusterManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct ClusterNode {
pub address: String,
pub resources: NodeResources,
pub status: NodeStatus,
pub current_workload: NodeWorkload,
}
impl ClusterNode {
#[must_use]
pub fn is_available(&self) -> bool {
self.status == NodeStatus::Active && self.current_workload.cpu_utilization < 0.9
}
#[must_use]
pub fn available_capacity(&self) -> f64 {
1.0 - self.current_workload.cpu_utilization
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeStatus {
Active,
Busy,
Unavailable,
Failed,
Maintenance,
}
#[derive(Debug, Clone)]
pub struct NodeWorkload {
pub active_tasks: Vec<String>,
pub cpu_utilization: f64,
pub memory_utilization: f64,
pub network_utilization: f64,
}
impl Default for NodeWorkload {
fn default() -> Self {
Self {
active_tasks: Vec::new(),
cpu_utilization: 0.0,
memory_utilization: 0.0,
network_utilization: 0.0,
}
}
}
impl NodeWorkload {
#[must_use]
pub fn overall_load(&self) -> f64 {
(self.cpu_utilization + self.memory_utilization + self.network_utilization) / 3.0
}
}
#[derive(Debug)]
pub struct CommunicationManager {
pub protocol: CommunicationProtocol,
pub connections: HashMap<String, Connection>,
pub message_queues: HashMap<String, VecDeque<Message>>,
pub statistics: CommunicationStatistics,
}
impl CommunicationManager {
#[must_use]
pub fn new() -> Self {
Self {
protocol: CommunicationProtocol::TCP,
connections: HashMap::new(),
message_queues: HashMap::new(),
statistics: CommunicationStatistics::default(),
}
}
pub fn connect(&mut self, address: &str) -> Result<(), String> {
if self.connections.contains_key(address) {
return Ok(());
}
let connection = Connection {
id: format!("conn_{}", self.connections.len()),
remote_address: address.to_string(),
status: ConnectionStatus::Active,
statistics: ConnectionStatistics::default(),
};
self.connections.insert(address.to_string(), connection);
self.message_queues
.insert(address.to_string(), VecDeque::new());
self.statistics.connections_established += 1;
Ok(())
}
pub fn disconnect(&mut self, address: &str) {
if let Some(mut conn) = self.connections.remove(address) {
conn.status = ConnectionStatus::Disconnected;
self.statistics.connections_closed += 1;
}
self.message_queues.remove(address);
}
pub fn send(&mut self, destination: &str, message: Message) -> Result<(), String> {
if let Some(queue) = self.message_queues.get_mut(destination) {
queue.push_back(message);
self.statistics.messages_sent += 1;
Ok(())
} else {
Err(format!("No connection to {destination}"))
}
}
pub fn receive(&mut self, source: &str) -> Vec<Message> {
let mut messages = Vec::new();
if let Some(queue) = self.message_queues.get_mut(source) {
while let Some(msg) = queue.pop_front() {
messages.push(msg);
self.statistics.messages_received += 1;
}
}
messages
}
}
impl Default for CommunicationManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct Connection {
pub id: String,
pub remote_address: String,
pub status: ConnectionStatus,
pub statistics: ConnectionStatistics,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionStatus {
Active,
Connecting,
Disconnected,
Failed,
}
#[derive(Debug, Clone)]
pub struct Message {
pub id: String,
pub source: String,
pub destination: String,
pub message_type: MessageType,
pub payload: Vec<u8>,
pub timestamp: Instant,
}
impl Message {
#[must_use]
pub fn new(
source: String,
destination: String,
message_type: MessageType,
payload: Vec<u8>,
) -> Self {
Self {
id: uuid_simple(),
source,
destination,
message_type,
payload,
timestamp: Instant::now(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MessageType {
TaskAssignment,
TaskResult,
Heartbeat,
StatusUpdate,
Error,
Control,
}
#[derive(Debug, Clone, Default)]
pub struct FaultToleranceManager {
pub failed_nodes: Vec<String>,
pub recovery_attempts: HashMap<String, u32>,
pub checkpoints: HashMap<String, Checkpoint>,
}
impl FaultToleranceManager {
#[must_use]
pub fn new() -> Self {
Self {
failed_nodes: Vec::new(),
recovery_attempts: HashMap::new(),
checkpoints: HashMap::new(),
}
}
pub fn record_failure(&mut self, node_address: &str) {
if !self.failed_nodes.contains(&node_address.to_string()) {
self.failed_nodes.push(node_address.to_string());
}
*self
.recovery_attempts
.entry(node_address.to_string())
.or_insert(0) += 1;
}
pub fn record_recovery(&mut self, node_address: &str) {
self.failed_nodes.retain(|n| n != node_address);
}
pub fn create_checkpoint(&mut self, task_id: &str, data: Vec<u8>) {
let checkpoint = Checkpoint {
task_id: task_id.to_string(),
data,
timestamp: Instant::now(),
};
self.checkpoints.insert(task_id.to_string(), checkpoint);
}
#[must_use]
pub fn get_checkpoint(&self, task_id: &str) -> Option<&Checkpoint> {
self.checkpoints.get(task_id)
}
}
#[derive(Debug, Clone)]
pub struct Checkpoint {
pub task_id: String,
pub data: Vec<u8>,
pub timestamp: Instant,
}
#[derive(Debug, Clone, Default)]
pub struct NodeStatistics {
pub tasks_completed: u64,
pub tasks_failed: u64,
pub total_execution_time: std::time::Duration,
pub avg_task_time: std::time::Duration,
}
#[derive(Debug, Clone, Default)]
pub struct ConnectionStatistics {
pub bytes_sent: u64,
pub bytes_received: u64,
pub messages_sent: u64,
pub messages_received: u64,
pub errors: u64,
}
#[derive(Debug, Clone, Default)]
pub struct CommunicationStatistics {
pub connections_established: u64,
pub connections_closed: u64,
pub messages_sent: u64,
pub messages_received: u64,
pub total_bytes: u64,
}
fn uuid_simple() -> String {
use std::time::SystemTime;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
format!("msg_{now:x}")
}