use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::{broadcast, mpsc, RwLock};
use uuid::Uuid;
use crate::ai::MultiModalMessage;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskPriority {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkAgent {
pub id: Uuid,
pub name: String,
pub address: SocketAddr,
pub capabilities: Vec<String>,
pub load: f32,
pub status: NetworkAgentStatus,
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
#[serde(default)]
pub location: Option<GeoLocation>,
#[serde(default)]
pub cost_per_task: f64,
#[serde(default)]
pub latency_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GeoLocation {
pub latitude: f64,
pub longitude: f64,
pub region: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NetworkAgentStatus {
Available,
Busy,
Offline,
Error(String),
}
#[derive(Debug)]
pub struct DistributedSwarm {
pub id: Uuid,
pub network_agents: Arc<RwLock<HashMap<Uuid, NetworkAgent>>>,
pub task_queue: Arc<RwLock<Vec<DistributedTask>>>,
pub completed_tasks: Arc<RwLock<Vec<DistributedTask>>>,
pub coordinator: DistributedCoordinator,
pub listener: Option<TcpListener>,
pub message_tx: mpsc::UnboundedSender<NetworkMessage>,
pub message_rx: Arc<RwLock<mpsc::UnboundedReceiver<NetworkMessage>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributedTask {
pub id: Uuid,
pub description: String,
pub assigned_agent: Option<Uuid>,
pub priority: TaskPriority,
pub required_capabilities: Vec<String>,
pub input_data: MultiModalMessage,
pub result: Option<TaskResult>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub started_at: Option<chrono::DateTime<chrono::Utc>>,
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
pub dependencies: Vec<Uuid>,
pub status: DistributedTaskStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DistributedTaskStatus {
Pending,
Assigned,
Running,
Completed,
Failed(String),
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskResult {
pub output: MultiModalMessage,
pub metadata: HashMap<String, String>,
pub execution_time: f64,
pub confidence_score: f32,
}
#[derive(Debug, Clone)]
pub struct DistributedCoordinator {
pub strategy: DistributionStrategy,
pub load_balancer: LoadBalancer,
pub failure_handler: FailureHandler,
pub coordinator_location: Option<GeoLocation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DistributionStrategy {
RoundRobin,
LoadBalanced,
CapabilityBased,
GeographicProximity,
CostOptimized,
LatencyOptimized,
}
#[derive(Debug, Clone)]
pub struct LoadBalancer {
pub max_load_per_agent: f32,
pub rebalance_threshold: f32,
pub health_check_interval: u64,
}
#[derive(Debug, Clone)]
pub struct FailureHandler {
pub max_retries: u32,
pub retry_delay: u64,
pub fallback_strategy: FallbackStrategy,
}
#[derive(Debug, Clone)]
pub enum FallbackStrategy {
ReassignToOther,
RetryLocally,
SkipTask,
EscalateToHuman,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NetworkMessage {
AgentRegistration {
agent: NetworkAgent,
},
TaskAssignment {
task: DistributedTask,
target_agent: Uuid,
},
TaskResult {
task_id: Uuid,
result: TaskResult,
},
Heartbeat {
agent_id: Uuid,
load: f32,
status: NetworkAgentStatus,
},
AgentShutdown {
agent_id: Uuid,
},
SwarmUpdate {
swarm_status: SwarmStatus,
},
Ping {
timestamp: u64,
},
Pong {
timestamp: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmStatus {
pub total_agents: usize,
pub active_tasks: usize,
pub completed_tasks: usize,
pub average_load: f32,
pub network_latency: f64,
}
async fn measure_latency(addr: SocketAddr) -> f64 {
let start = Instant::now();
match TcpStream::connect(addr).await {
Ok(mut stream) => {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let ping = NetworkMessage::Ping { timestamp };
if let Ok(bytes) = serde_json::to_vec(&ping) {
let len = bytes.len() as u32;
let _ = stream.write_all(&len.to_be_bytes()).await;
let _ = stream.write_all(&bytes).await;
}
start.elapsed().as_secs_f64() * 1000.0 }
Err(_) => {
f64::MAX }
}
}
fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
const EARTH_RADIUS_KM: f64 = 6371.0;
let lat1_rad = lat1.to_radians();
let lat2_rad = lat2.to_radians();
let delta_lat = (lat2 - lat1).to_radians();
let delta_lon = (lon2 - lon1).to_radians();
let a = (delta_lat / 2.0).sin().powi(2)
+ lat1_rad.cos() * lat2_rad.cos() * (delta_lon / 2.0).sin().powi(2);
let c = 2.0 * a.sqrt().asin();
EARTH_RADIUS_KM * c
}
impl DistributedSwarm {
pub async fn new(listen_addr: SocketAddr) -> Result<Self> {
let (message_tx, message_rx) = mpsc::unbounded_channel();
let listener = TcpListener::bind(listen_addr).await?;
Ok(Self {
id: Uuid::new_v4(),
network_agents: Arc::new(RwLock::new(HashMap::new())),
task_queue: Arc::new(RwLock::new(Vec::new())),
completed_tasks: Arc::new(RwLock::new(Vec::new())),
coordinator: DistributedCoordinator::new(),
listener: Some(listener),
message_tx,
message_rx: Arc::new(RwLock::new(message_rx)),
})
}
pub async fn start(&mut self) -> Result<()> {
if let Some(listener) = self.listener.take() {
let agents = Arc::clone(&self.network_agents);
let message_tx = self.message_tx.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
let agents_clone = Arc::clone(&agents);
let tx_clone = message_tx.clone();
tokio::spawn(handle_connection(stream, addr, agents_clone, tx_clone));
}
Err(e) => {
eprintln!("Failed to accept connection: {}", e);
}
}
}
});
}
self.start_task_distribution().await?;
Ok(())
}
pub async fn register_agent(&self, agent: NetworkAgent) -> Result<()> {
let mut agents = self.network_agents.write().await;
agents.insert(agent.id, agent.clone());
self.message_tx
.send(NetworkMessage::AgentRegistration { agent })?;
Ok(())
}
pub async fn assign_task(&self, task: DistributedTask) -> Result<()> {
let agents = self.network_agents.read().await;
let best_agent = self.coordinator.select_best_agent(&agents, &task).await?;
if let Some(agent_id) = best_agent {
let mut updated_task = task.clone();
updated_task.assigned_agent = Some(agent_id);
updated_task.status = DistributedTaskStatus::Assigned;
self.message_tx.send(NetworkMessage::TaskAssignment {
task: updated_task.clone(),
target_agent: agent_id,
})?;
let mut queue = self.task_queue.write().await;
queue.push(updated_task);
}
Ok(())
}
pub async fn get_metrics(&self) -> SwarmStatus {
let agents = self.network_agents.read().await;
let tasks = self.task_queue.read().await;
let completed = self.completed_tasks.read().await;
let total_agents = agents.len();
let active_tasks = tasks
.iter()
.filter(|t| matches!(t.status, DistributedTaskStatus::Running))
.count();
let completed_tasks = completed.len();
let average_load = if total_agents > 0 {
agents.values().map(|a| a.load).sum::<f32>() / total_agents as f32
} else {
0.0
};
let network_latency = if total_agents > 0 {
let latency_sum: f64 = agents.values().map(|a| a.latency_ms).sum();
latency_sum / total_agents as f64
} else {
0.0
};
SwarmStatus {
total_agents,
active_tasks,
completed_tasks,
average_load,
network_latency,
}
}
pub async fn update_latencies(&self) {
let agents_read = self.network_agents.read().await;
let addresses: Vec<_> = agents_read.values().map(|a| (a.id, a.address)).collect();
drop(agents_read);
for (agent_id, addr) in addresses {
let latency = measure_latency(addr).await;
if let Some(agent) = self.network_agents.write().await.get_mut(&agent_id) {
agent.latency_ms = latency;
}
}
}
async fn start_task_distribution(&self) -> Result<()> {
let queue = Arc::clone(&self.task_queue);
let agents = Arc::clone(&self.network_agents);
let coordinator = self.coordinator.clone();
let message_tx = self.message_tx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
loop {
interval.tick().await;
let mut task_queue = queue.write().await;
let agents_read = agents.read().await;
for task in task_queue.iter_mut() {
if matches!(task.status, DistributedTaskStatus::Pending) {
if let Ok(Some(agent_id)) =
coordinator.select_best_agent(&agents_read, task).await
{
task.assigned_agent = Some(agent_id);
task.status = DistributedTaskStatus::Assigned;
let _ = message_tx.send(NetworkMessage::TaskAssignment {
task: task.clone(),
target_agent: agent_id,
});
}
}
}
}
});
Ok(())
}
pub async fn shutdown(&mut self) -> Result<()> {
self.network_agents.write().await.clear();
self.task_queue.write().await.clear();
self.completed_tasks.write().await.clear();
if let Some(_listener) = self.listener.take() {
}
Ok(())
}
}
impl DistributedCoordinator {
pub fn new() -> Self {
Self {
strategy: DistributionStrategy::LoadBalanced,
load_balancer: LoadBalancer {
max_load_per_agent: 0.8,
rebalance_threshold: 0.6,
health_check_interval: 30,
},
failure_handler: FailureHandler {
max_retries: 3,
retry_delay: 5000,
fallback_strategy: FallbackStrategy::ReassignToOther,
},
coordinator_location: None,
}
}
pub fn set_location(&mut self, location: GeoLocation) {
self.coordinator_location = Some(location);
}
pub async fn select_best_agent(
&self,
agents: &HashMap<Uuid, NetworkAgent>,
task: &DistributedTask,
) -> Result<Option<Uuid>> {
let available_agents: Vec<_> = agents
.values()
.filter(|agent| {
matches!(agent.status, NetworkAgentStatus::Available)
&& agent.load < self.load_balancer.max_load_per_agent
&& self.agent_has_capabilities(agent, &task.required_capabilities)
})
.collect();
if available_agents.is_empty() {
return Ok(None);
}
let selected = match self.strategy {
DistributionStrategy::RoundRobin => {
available_agents.first().map(|agent| agent.id)
}
DistributionStrategy::LoadBalanced => {
available_agents
.iter()
.min_by(|a, b| {
a.load
.partial_cmp(&b.load)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|agent| agent.id)
}
DistributionStrategy::CapabilityBased => {
available_agents
.iter()
.max_by_key(|agent| {
self.calculate_capability_score(agent, &task.required_capabilities)
})
.map(|agent| agent.id)
}
DistributionStrategy::GeographicProximity => {
if let Some(ref coord_loc) = self.coordinator_location {
available_agents
.iter()
.filter(|a| a.location.is_some())
.min_by(|a, b| {
let a_loc = a.location.as_ref().unwrap();
let b_loc = b.location.as_ref().unwrap();
let dist_a = haversine_distance(
coord_loc.latitude,
coord_loc.longitude,
a_loc.latitude,
a_loc.longitude,
);
let dist_b = haversine_distance(
coord_loc.latitude,
coord_loc.longitude,
b_loc.latitude,
b_loc.longitude,
);
dist_a
.partial_cmp(&dist_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|agent| agent.id)
} else {
available_agents
.iter()
.min_by(|a, b| {
a.load
.partial_cmp(&b.load)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|agent| agent.id)
}
}
DistributionStrategy::CostOptimized => {
available_agents
.iter()
.min_by(|a, b| {
a.cost_per_task
.partial_cmp(&b.cost_per_task)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|agent| agent.id)
}
DistributionStrategy::LatencyOptimized => {
available_agents
.iter()
.min_by(|a, b| {
a.latency_ms
.partial_cmp(&b.latency_ms)
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|agent| agent.id)
}
};
Ok(selected)
}
fn agent_has_capabilities(&self, agent: &NetworkAgent, required: &[String]) -> bool {
required.iter().all(|cap| agent.capabilities.contains(cap))
}
fn calculate_capability_score(&self, agent: &NetworkAgent, required: &[String]) -> usize {
required
.iter()
.filter(|cap| agent.capabilities.contains(cap))
.count()
}
}
async fn handle_connection(
stream: TcpStream,
addr: SocketAddr,
agents: Arc<RwLock<HashMap<Uuid, NetworkAgent>>>,
message_tx: mpsc::UnboundedSender<NetworkMessage>,
) {
tracing::info!("New connection from: {}", addr);
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
loop {
let mut len_buf = [0u8; 4];
if reader.read_exact(&mut len_buf).await.is_err() {
break; }
let msg_len = u32::from_be_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
if reader.read_exact(&mut msg_buf).await.is_err() {
break;
}
match serde_json::from_slice::<NetworkMessage>(&msg_buf) {
Ok(message) => {
match &message {
NetworkMessage::AgentRegistration { agent } => {
let mut agents_write = agents.write().await;
agents_write.insert(agent.id, agent.clone());
let _ = message_tx.send(message.clone());
let ack = serde_json::json!({"status": "registered", "agent_id": agent.id.to_string()});
let ack_bytes = serde_json::to_vec(&ack).unwrap();
let _ = writer
.write_all(&(ack_bytes.len() as u32).to_be_bytes())
.await;
let _ = writer.write_all(&ack_bytes).await;
}
NetworkMessage::Heartbeat {
agent_id,
load,
status,
} => {
let mut agents_write = agents.write().await;
if let Some(agent) = agents_write.get_mut(agent_id) {
agent.load = *load;
agent.status = status.clone();
agent.last_heartbeat = chrono::Utc::now();
}
let _ = message_tx.send(message.clone());
}
NetworkMessage::TaskResult { task_id, result } => {
let _ = message_tx.send(NetworkMessage::TaskResult {
task_id: *task_id,
result: result.clone(),
});
}
NetworkMessage::AgentShutdown { agent_id } => {
let mut agents_write = agents.write().await;
agents_write.remove(agent_id);
let _ = message_tx.send(message.clone());
}
NetworkMessage::Ping { timestamp } => {
let pong = NetworkMessage::Pong {
timestamp: *timestamp,
};
if let Ok(pong_bytes) = serde_json::to_vec(&pong) {
let _ = writer
.write_all(&(pong_bytes.len() as u32).to_be_bytes())
.await;
let _ = writer.write_all(&pong_bytes).await;
}
}
_ => {
let _ = message_tx.send(message);
}
}
}
Err(e) => {
tracing::warn!("Failed to parse message from {}: {}", addr, e);
}
}
}
tracing::info!("Connection closed from: {}", addr);
}
#[derive(Debug)]
pub struct SwarmClient {
pub agent_id: Uuid,
pub swarm_address: SocketAddr,
pub connection: Option<TcpStream>,
}
impl SwarmClient {
pub fn new(agent_id: Uuid, swarm_address: SocketAddr) -> Self {
Self {
agent_id,
swarm_address,
connection: None,
}
}
pub async fn connect(&mut self) -> Result<()> {
let stream = TcpStream::connect(self.swarm_address).await?;
self.connection = Some(stream);
Ok(())
}
async fn send_message(&mut self, message: &NetworkMessage) -> Result<()> {
if let Some(ref mut stream) = self.connection {
let msg_bytes = serde_json::to_vec(message)?;
stream
.write_all(&(msg_bytes.len() as u32).to_be_bytes())
.await?;
stream.write_all(&msg_bytes).await?;
stream.flush().await?;
Ok(())
} else {
Err(anyhow::anyhow!("Not connected to swarm"))
}
}
async fn read_response(&mut self) -> Result<serde_json::Value> {
if let Some(ref mut stream) = self.connection {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let msg_len = u32::from_be_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
stream.read_exact(&mut msg_buf).await?;
Ok(serde_json::from_slice(&msg_buf)?)
} else {
Err(anyhow::anyhow!("Not connected to swarm"))
}
}
pub async fn register_agent(&mut self, agent: NetworkAgent) -> Result<()> {
let message = NetworkMessage::AgentRegistration { agent };
self.send_message(&message).await?;
let response = self.read_response().await?;
if response.get("status").and_then(|s| s.as_str()) == Some("registered") {
Ok(())
} else {
Err(anyhow::anyhow!("Registration failed"))
}
}
pub async fn send_heartbeat(&mut self, load: f32, status: NetworkAgentStatus) -> Result<()> {
let message = NetworkMessage::Heartbeat {
agent_id: self.agent_id,
load,
status,
};
self.send_message(&message).await
}
pub async fn send_task_result(&mut self, task_id: Uuid, result: TaskResult) -> Result<()> {
let message = NetworkMessage::TaskResult { task_id, result };
self.send_message(&message).await
}
pub async fn shutdown(&mut self) -> Result<()> {
let message = NetworkMessage::AgentShutdown {
agent_id: self.agent_id,
};
self.send_message(&message).await?;
self.connection = None;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceDiscoveryConfig {
pub service_name: String,
pub protocol: DiscoveryProtocol,
pub multicast_addr: SocketAddr,
pub broadcast_interval: u64,
pub stale_threshold: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DiscoveryProtocol {
Multicast,
StaticPeers(Vec<SocketAddr>),
Registry { url: String },
}
impl Default for ServiceDiscoveryConfig {
fn default() -> Self {
Self {
service_name: "aethershell-agent".to_string(),
protocol: DiscoveryProtocol::Multicast,
multicast_addr: "239.255.255.250:5353".parse().unwrap(),
broadcast_interval: 5,
stale_threshold: 30,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceAnnouncement {
pub service_name: String,
pub instance_id: Uuid,
pub address: SocketAddr,
pub capabilities: Vec<String>,
pub metadata: HashMap<String, String>,
pub timestamp: u64,
}
#[derive(Debug)]
pub struct ServiceDiscovery {
config: ServiceDiscoveryConfig,
local_instance: ServiceAnnouncement,
discovered_services: Arc<RwLock<HashMap<Uuid, ServiceAnnouncement>>>,
discovery_tx: broadcast::Sender<ServiceAnnouncement>,
running: Arc<RwLock<bool>>,
}
impl ServiceDiscovery {
pub fn new(
config: ServiceDiscoveryConfig,
instance_id: Uuid,
address: SocketAddr,
capabilities: Vec<String>,
) -> Self {
let (discovery_tx, _) = broadcast::channel(100);
Self {
local_instance: ServiceAnnouncement {
service_name: config.service_name.clone(),
instance_id,
address,
capabilities,
metadata: HashMap::new(),
timestamp: current_timestamp(),
},
config,
discovered_services: Arc::new(RwLock::new(HashMap::new())),
discovery_tx,
running: Arc::new(RwLock::new(false)),
}
}
pub async fn start(&self) -> Result<()> {
*self.running.write().await = true;
match &self.config.protocol {
DiscoveryProtocol::Multicast => {
self.start_multicast_discovery().await?;
}
DiscoveryProtocol::StaticPeers(peers) => {
self.probe_static_peers(peers.clone()).await?;
}
DiscoveryProtocol::Registry { url } => {
self.register_with_registry(url).await?;
}
}
Ok(())
}
async fn start_multicast_discovery(&self) -> Result<()> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
let multicast_addr = self.config.multicast_addr.ip();
if let std::net::IpAddr::V4(addr) = multicast_addr {
socket.join_multicast_v4(addr, std::net::Ipv4Addr::UNSPECIFIED)?;
}
let socket = Arc::new(socket);
let discovered = Arc::clone(&self.discovered_services);
let running = Arc::clone(&self.running);
let local_instance = self.local_instance.clone();
let multicast_addr = self.config.multicast_addr;
let broadcast_interval = self.config.broadcast_interval;
let stale_threshold = self.config.stale_threshold;
let discovery_tx = self.discovery_tx.clone();
let socket_clone = Arc::clone(&socket);
let running_clone = Arc::clone(&running);
let local_clone = local_instance.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(broadcast_interval));
while *running_clone.read().await {
interval.tick().await;
let mut announcement = local_clone.clone();
announcement.timestamp = current_timestamp();
if let Ok(bytes) = serde_json::to_vec(&announcement) {
let _ = socket_clone.send_to(&bytes, multicast_addr).await;
}
}
});
tokio::spawn(async move {
let mut buf = vec![0u8; 65535];
while *running.read().await {
if let Ok((len, _addr)) = socket.recv_from(&mut buf).await {
if let Ok(announcement) =
serde_json::from_slice::<ServiceAnnouncement>(&buf[..len])
{
if announcement.instance_id != local_instance.instance_id {
let mut services = discovered.write().await;
services.insert(announcement.instance_id, announcement.clone());
let _ = discovery_tx.send(announcement);
}
}
}
}
});
let discovered_clone = Arc::clone(&self.discovered_services);
let running_clone = Arc::clone(&self.running);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(stale_threshold / 2));
while *running_clone.read().await {
interval.tick().await;
let now = current_timestamp();
let mut services = discovered_clone.write().await;
services.retain(|_, s| now - s.timestamp < stale_threshold);
}
});
Ok(())
}
async fn probe_static_peers(&self, peers: Vec<SocketAddr>) -> Result<()> {
for peer in peers {
if let Ok(mut stream) = TcpStream::connect(peer).await {
let announcement = &self.local_instance;
if let Ok(bytes) = serde_json::to_vec(announcement) {
let _ = stream.write_all(&(bytes.len() as u32).to_be_bytes()).await;
let _ = stream.write_all(&bytes).await;
}
}
}
Ok(())
}
async fn register_with_registry(&self, url: &str) -> Result<()> {
#[cfg(feature = "native")]
{
let client = reqwest::Client::new();
let announcement = &self.local_instance;
client
.post(&format!("{}/services", url))
.json(announcement)
.send()
.await?;
}
Ok(())
}
pub async fn get_services(&self) -> Vec<ServiceAnnouncement> {
self.discovered_services
.read()
.await
.values()
.cloned()
.collect()
}
pub fn subscribe(&self) -> broadcast::Receiver<ServiceAnnouncement> {
self.discovery_tx.subscribe()
}
pub async fn stop(&self) {
*self.running.write().await = false;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GossipMessage {
Join { member: ClusterMember },
Leave { member_id: Uuid },
Suspect { member_id: Uuid, reporter_id: Uuid },
Alive { member: ClusterMember },
SyncRequest { from: Uuid, state_version: u64 },
SyncResponse {
members: Vec<ClusterMember>,
state_version: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterMember {
pub id: Uuid,
pub address: SocketAddr,
pub state: MemberState,
pub incarnation: u64,
pub metadata: HashMap<String, String>,
pub last_updated: u64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum MemberState {
Alive,
Suspect,
Dead,
Left,
}
#[derive(Debug, Clone)]
pub struct GossipConfig {
pub fanout: usize,
pub gossip_interval: u64,
pub suspect_timeout: u64,
pub dead_timeout: u64,
pub indirect_checks: usize,
}
impl Default for GossipConfig {
fn default() -> Self {
Self {
fanout: 3,
gossip_interval: 200,
suspect_timeout: 2000,
dead_timeout: 5000,
indirect_checks: 3,
}
}
}
#[derive(Debug)]
pub struct GossipCluster {
local_member: ClusterMember,
members: Arc<RwLock<HashMap<Uuid, ClusterMember>>>,
config: GossipConfig,
gossip_queue: Arc<RwLock<Vec<GossipMessage>>>,
state_version: Arc<RwLock<u64>>,
event_tx: broadcast::Sender<ClusterEvent>,
running: Arc<RwLock<bool>>,
}
#[derive(Debug, Clone)]
pub enum ClusterEvent {
MemberJoined(ClusterMember),
MemberLeft(Uuid),
MemberSuspected(Uuid),
MemberDead(Uuid),
LeaderElected(Uuid),
}
impl GossipCluster {
pub fn new(id: Uuid, address: SocketAddr, config: GossipConfig) -> Self {
let (event_tx, _) = broadcast::channel(100);
let local_member = ClusterMember {
id,
address,
state: MemberState::Alive,
incarnation: 0,
metadata: HashMap::new(),
last_updated: current_timestamp(),
};
Self {
local_member,
members: Arc::new(RwLock::new(HashMap::new())),
config,
gossip_queue: Arc::new(RwLock::new(Vec::new())),
state_version: Arc::new(RwLock::new(0)),
event_tx,
running: Arc::new(RwLock::new(false)),
}
}
pub async fn start(&self, listen_addr: SocketAddr) -> Result<()> {
*self.running.write().await = true;
{
let mut members = self.members.write().await;
members.insert(self.local_member.id, self.local_member.clone());
}
let socket = Arc::new(UdpSocket::bind(listen_addr).await?);
let socket_clone = Arc::clone(&socket);
let members = Arc::clone(&self.members);
let gossip_queue = Arc::clone(&self.gossip_queue);
let running = Arc::clone(&self.running);
let event_tx = self.event_tx.clone();
let local_id = self.local_member.id;
let state_version = Arc::clone(&self.state_version);
tokio::spawn(async move {
let mut buf = vec![0u8; 65535];
while *running.read().await {
if let Ok((len, from_addr)) = socket_clone.recv_from(&mut buf).await {
if let Ok(msg) = serde_json::from_slice::<GossipMessage>(&buf[..len]) {
Self::handle_gossip_message(
msg,
from_addr,
&members,
&gossip_queue,
&event_tx,
local_id,
&state_version,
)
.await;
}
}
}
});
let socket_clone = Arc::clone(&socket);
let members_clone = Arc::clone(&self.members);
let gossip_queue_clone = Arc::clone(&self.gossip_queue);
let running_clone = Arc::clone(&self.running);
let gossip_interval = self.config.gossip_interval;
let fanout = self.config.fanout;
let local_member = self.local_member.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(gossip_interval));
while *running_clone.read().await {
interval.tick().await;
let members = members_clone.read().await;
let mut queue = gossip_queue_clone.write().await;
let targets: Vec<_> = members
.values()
.filter(|m| m.id != local_member.id && m.state == MemberState::Alive)
.take(fanout)
.cloned()
.collect();
for target in targets {
for msg in queue.iter() {
if let Ok(bytes) = serde_json::to_vec(msg) {
let _ = socket_clone.send_to(&bytes, target.address).await;
}
}
}
if queue.len() > 100 {
queue.drain(..50);
}
}
});
let members_clone = Arc::clone(&self.members);
let gossip_queue_clone = Arc::clone(&self.gossip_queue);
let running_clone = Arc::clone(&self.running);
let event_tx_clone = self.event_tx.clone();
let suspect_timeout = self.config.suspect_timeout;
let dead_timeout = self.config.dead_timeout;
let local_id = self.local_member.id;
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(suspect_timeout / 2));
while *running_clone.read().await {
interval.tick().await;
let now = current_timestamp();
let mut members = members_clone.write().await;
let mut queue = gossip_queue_clone.write().await;
for member in members.values_mut() {
if member.id == local_id {
continue;
}
let age = now.saturating_sub(member.last_updated);
match member.state {
MemberState::Alive if age > suspect_timeout => {
member.state = MemberState::Suspect;
queue.push(GossipMessage::Suspect {
member_id: member.id,
reporter_id: local_id,
});
let _ = event_tx_clone.send(ClusterEvent::MemberSuspected(member.id));
}
MemberState::Suspect if age > dead_timeout => {
member.state = MemberState::Dead;
let _ = event_tx_clone.send(ClusterEvent::MemberDead(member.id));
}
_ => {}
}
}
members.retain(|_, m| m.state != MemberState::Dead);
}
});
Ok(())
}
async fn handle_gossip_message(
msg: GossipMessage,
_from_addr: SocketAddr,
members: &Arc<RwLock<HashMap<Uuid, ClusterMember>>>,
queue: &Arc<RwLock<Vec<GossipMessage>>>,
event_tx: &broadcast::Sender<ClusterEvent>,
local_id: Uuid,
state_version: &Arc<RwLock<u64>>,
) {
match msg {
GossipMessage::Join { member } => {
let mut members = members.write().await;
if !members.contains_key(&member.id) {
members.insert(member.id, member.clone());
let _ = event_tx.send(ClusterEvent::MemberJoined(member.clone()));
queue.write().await.push(GossipMessage::Join { member });
}
}
GossipMessage::Leave { member_id } => {
let mut members = members.write().await;
if let Some(member) = members.get_mut(&member_id) {
member.state = MemberState::Left;
let _ = event_tx.send(ClusterEvent::MemberLeft(member_id));
}
queue.write().await.push(GossipMessage::Leave { member_id });
}
GossipMessage::Suspect {
member_id,
reporter_id,
} => {
if member_id == local_id {
let members = members.read().await;
if let Some(local) = members.get(&local_id) {
let mut refute = local.clone();
refute.incarnation += 1;
refute.last_updated = current_timestamp();
queue
.write()
.await
.push(GossipMessage::Alive { member: refute });
}
} else {
queue.write().await.push(GossipMessage::Suspect {
member_id,
reporter_id,
});
}
}
GossipMessage::Alive { member } => {
let mut members = members.write().await;
if let Some(existing) = members.get_mut(&member.id) {
if member.incarnation > existing.incarnation {
*existing = member.clone();
existing.state = MemberState::Alive;
}
} else {
members.insert(member.id, member.clone());
}
queue.write().await.push(GossipMessage::Alive { member });
}
GossipMessage::SyncRequest {
from,
state_version: req_version,
} => {
let current_version = *state_version.read().await;
if req_version < current_version {
let members = members.read().await;
let response = GossipMessage::SyncResponse {
members: members.values().cloned().collect(),
state_version: current_version,
};
let _ = response;
let _ = from;
}
}
GossipMessage::SyncResponse {
members: new_members,
state_version: new_version,
} => {
let mut current_version = state_version.write().await;
if new_version > *current_version {
let mut members = members.write().await;
for member in new_members {
members.insert(member.id, member);
}
*current_version = new_version;
}
}
}
}
pub async fn join(&self, seed_addr: SocketAddr) -> Result<()> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
let join_msg = GossipMessage::Join {
member: self.local_member.clone(),
};
let bytes = serde_json::to_vec(&join_msg)?;
socket.send_to(&bytes, seed_addr).await?;
let sync_msg = GossipMessage::SyncRequest {
from: self.local_member.id,
state_version: 0,
};
let bytes = serde_json::to_vec(&sync_msg)?;
socket.send_to(&bytes, seed_addr).await?;
Ok(())
}
pub async fn leave(&self) -> Result<()> {
let mut queue = self.gossip_queue.write().await;
queue.push(GossipMessage::Leave {
member_id: self.local_member.id,
});
tokio::time::sleep(Duration::from_millis(self.config.gossip_interval * 3)).await;
*self.running.write().await = false;
Ok(())
}
pub async fn get_members(&self) -> Vec<ClusterMember> {
self.members
.read()
.await
.values()
.filter(|m| m.state == MemberState::Alive)
.cloned()
.collect()
}
pub fn subscribe(&self) -> broadcast::Receiver<ClusterEvent> {
self.event_tx.subscribe()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ElectionState {
Follower,
Candidate,
Leader,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ElectionMessage {
RequestVote {
term: u64,
candidate_id: Uuid,
last_log_index: u64,
last_log_term: u64,
},
VoteResponse {
term: u64,
vote_granted: bool,
},
Heartbeat {
term: u64,
leader_id: Uuid,
},
HeartbeatResponse {
term: u64,
success: bool,
},
}
#[derive(Debug, Clone)]
pub struct ElectionConfig {
pub election_timeout_min: u64,
pub election_timeout_max: u64,
pub heartbeat_interval: u64,
}
impl Default for ElectionConfig {
fn default() -> Self {
Self {
election_timeout_min: 150,
election_timeout_max: 300,
heartbeat_interval: 50,
}
}
}
#[derive(Debug)]
pub struct LeaderElection {
node_id: Uuid,
state: Arc<RwLock<ElectionState>>,
current_term: Arc<RwLock<u64>>,
voted_for: Arc<RwLock<Option<Uuid>>>,
current_leader: Arc<RwLock<Option<Uuid>>>,
config: ElectionConfig,
peers: Arc<RwLock<Vec<SocketAddr>>>,
event_tx: broadcast::Sender<ClusterEvent>,
running: Arc<RwLock<bool>>,
}
impl LeaderElection {
pub fn new(node_id: Uuid, config: ElectionConfig) -> Self {
let (event_tx, _) = broadcast::channel(100);
Self {
node_id,
state: Arc::new(RwLock::new(ElectionState::Follower)),
current_term: Arc::new(RwLock::new(0)),
voted_for: Arc::new(RwLock::new(None)),
current_leader: Arc::new(RwLock::new(None)),
config,
peers: Arc::new(RwLock::new(Vec::new())),
event_tx,
running: Arc::new(RwLock::new(false)),
}
}
pub async fn set_peers(&self, peers: Vec<SocketAddr>) {
*self.peers.write().await = peers;
}
pub async fn start(&self, listen_addr: SocketAddr) -> Result<()> {
*self.running.write().await = true;
let socket = Arc::new(UdpSocket::bind(listen_addr).await?);
let socket_clone = Arc::clone(&socket);
let state = Arc::clone(&self.state);
let current_term = Arc::clone(&self.current_term);
let voted_for = Arc::clone(&self.voted_for);
let current_leader = Arc::clone(&self.current_leader);
let running = Arc::clone(&self.running);
let node_id = self.node_id;
let event_tx = self.event_tx.clone();
tokio::spawn(async move {
let mut buf = vec![0u8; 65535];
while *running.read().await {
if let Ok((len, from_addr)) = socket_clone.recv_from(&mut buf).await {
if let Ok(msg) = serde_json::from_slice::<ElectionMessage>(&buf[..len]) {
let response = Self::handle_message(
msg,
&state,
¤t_term,
&voted_for,
¤t_leader,
node_id,
&event_tx,
)
.await;
if let Some(resp) = response {
if let Ok(bytes) = serde_json::to_vec(&resp) {
let _ = socket_clone.send_to(&bytes, from_addr).await;
}
}
}
}
}
});
let socket_clone = Arc::clone(&socket);
let state_clone = Arc::clone(&self.state);
let current_term_clone = Arc::clone(&self.current_term);
let voted_for_clone = Arc::clone(&self.voted_for);
let current_leader_clone = Arc::clone(&self.current_leader);
let peers_clone = Arc::clone(&self.peers);
let running_clone = Arc::clone(&self.running);
let config = self.config.clone();
let node_id = self.node_id;
let event_tx = self.event_tx.clone();
tokio::spawn(async move {
let mut last_heartbeat = Instant::now();
while *running_clone.read().await {
let state = state_clone.read().await.clone();
match state {
ElectionState::Leader => {
tokio::time::sleep(Duration::from_millis(config.heartbeat_interval)).await;
let term = *current_term_clone.read().await;
let heartbeat = ElectionMessage::Heartbeat {
term,
leader_id: node_id,
};
let bytes = serde_json::to_vec(&heartbeat).unwrap_or_default();
for peer in peers_clone.read().await.iter() {
let _ = socket_clone.send_to(&bytes, *peer).await;
}
}
ElectionState::Follower | ElectionState::Candidate => {
let timeout = rand::random::<u64>()
% (config.election_timeout_max - config.election_timeout_min)
+ config.election_timeout_min;
tokio::time::sleep(Duration::from_millis(timeout)).await;
if last_heartbeat.elapsed() > Duration::from_millis(timeout) {
*state_clone.write().await = ElectionState::Candidate;
let mut term = current_term_clone.write().await;
*term += 1;
*voted_for_clone.write().await = Some(node_id);
let request = ElectionMessage::RequestVote {
term: *term,
candidate_id: node_id,
last_log_index: 0,
last_log_term: 0,
};
let bytes = serde_json::to_vec(&request).unwrap_or_default();
let mut votes = 1; let peers = peers_clone.read().await.clone();
let needed = (peers.len() + 1) / 2 + 1;
for peer in &peers {
let _ = socket_clone.send_to(&bytes, *peer).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
if *state_clone.read().await == ElectionState::Candidate {
votes += peers.len() / 2;
if votes >= needed {
*state_clone.write().await = ElectionState::Leader;
*current_leader_clone.write().await = Some(node_id);
let _ = event_tx.send(ClusterEvent::LeaderElected(node_id));
}
}
}
last_heartbeat = Instant::now();
}
}
}
});
Ok(())
}
async fn handle_message(
msg: ElectionMessage,
state: &Arc<RwLock<ElectionState>>,
current_term: &Arc<RwLock<u64>>,
voted_for: &Arc<RwLock<Option<Uuid>>>,
current_leader: &Arc<RwLock<Option<Uuid>>>,
_node_id: Uuid,
_event_tx: &broadcast::Sender<ClusterEvent>,
) -> Option<ElectionMessage> {
match msg {
ElectionMessage::RequestVote {
term, candidate_id, ..
} => {
let mut my_term = current_term.write().await;
let mut my_vote = voted_for.write().await;
if term > *my_term {
*my_term = term;
*my_vote = None;
*state.write().await = ElectionState::Follower;
}
let vote_granted =
term >= *my_term && (my_vote.is_none() || *my_vote == Some(candidate_id));
if vote_granted {
*my_vote = Some(candidate_id);
}
Some(ElectionMessage::VoteResponse {
term: *my_term,
vote_granted,
})
}
ElectionMessage::Heartbeat { term, leader_id } => {
let mut my_term = current_term.write().await;
if term >= *my_term {
*my_term = term;
*state.write().await = ElectionState::Follower;
*current_leader.write().await = Some(leader_id);
}
Some(ElectionMessage::HeartbeatResponse {
term: *my_term,
success: term >= *my_term,
})
}
ElectionMessage::VoteResponse {
term,
vote_granted: _,
} => {
let my_term = *current_term.read().await;
if term > my_term {
*current_term.write().await = term;
*state.write().await = ElectionState::Follower;
}
None }
ElectionMessage::HeartbeatResponse { term, .. } => {
let my_term = *current_term.read().await;
if term > my_term {
*current_term.write().await = term;
*state.write().await = ElectionState::Follower;
}
None
}
}
}
pub async fn is_leader(&self) -> bool {
*self.state.read().await == ElectionState::Leader
}
pub async fn get_leader(&self) -> Option<Uuid> {
*self.current_leader.read().await
}
pub async fn get_state(&self) -> ElectionState {
self.state.read().await.clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<ClusterEvent> {
self.event_tx.subscribe()
}
pub async fn stop(&self) {
*self.running.write().await = false;
}
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_distributed_swarm_creation() {
let addr = "127.0.0.1:0".parse().unwrap();
let swarm = DistributedSwarm::new(addr).await.unwrap();
assert!(!swarm.id.is_nil());
assert_eq!(swarm.network_agents.read().await.len(), 0);
}
#[tokio::test]
async fn test_agent_registration() {
let addr = "127.0.0.1:0".parse().unwrap();
let swarm = DistributedSwarm::new(addr).await.unwrap();
let agent = NetworkAgent {
id: Uuid::new_v4(),
name: "Test Agent".to_string(),
address: "127.0.0.1:8081".parse().unwrap(),
capabilities: vec!["text".to_string(), "image".to_string()],
load: 0.0,
status: NetworkAgentStatus::Available,
last_heartbeat: chrono::Utc::now(),
location: None,
cost_per_task: 0.01,
latency_ms: 0.0,
};
swarm.register_agent(agent.clone()).await.unwrap();
let agents = swarm.network_agents.read().await;
assert_eq!(agents.len(), 1);
assert!(agents.contains_key(&agent.id));
}
#[test]
fn test_capability_matching() {
let coordinator = DistributedCoordinator::new();
let agent = NetworkAgent {
id: Uuid::new_v4(),
name: "Test Agent".to_string(),
address: "127.0.0.1:8081".parse().unwrap(),
capabilities: vec!["text".to_string(), "image".to_string(), "audio".to_string()],
load: 0.0,
status: NetworkAgentStatus::Available,
last_heartbeat: chrono::Utc::now(),
location: None,
cost_per_task: 0.0,
latency_ms: 0.0,
};
let required = vec!["text".to_string(), "image".to_string()];
assert!(coordinator.agent_has_capabilities(&agent, &required));
let score = coordinator.calculate_capability_score(&agent, &required);
assert_eq!(score, 2);
}
#[test]
fn test_haversine_distance() {
let dist = haversine_distance(40.7128, -74.0060, 51.5074, -0.1278);
assert!(dist > 5500.0 && dist < 5700.0);
let dist = haversine_distance(0.0, 0.0, 0.0, 0.0);
assert!(dist < 0.001);
}
#[test]
fn test_service_discovery_config_default() {
let config = ServiceDiscoveryConfig::default();
assert_eq!(config.service_name, "aethershell-agent");
assert_eq!(config.broadcast_interval, 5);
assert_eq!(config.stale_threshold, 30);
}
#[test]
fn test_service_announcement_serialization() {
let announcement = ServiceAnnouncement {
service_name: "test-service".to_string(),
instance_id: Uuid::new_v4(),
address: "127.0.0.1:8080".parse().unwrap(),
capabilities: vec!["text".to_string(), "image".to_string()],
metadata: HashMap::new(),
timestamp: current_timestamp(),
};
let json = serde_json::to_string(&announcement).unwrap();
let deserialized: ServiceAnnouncement = serde_json::from_str(&json).unwrap();
assert_eq!(announcement.service_name, deserialized.service_name);
assert_eq!(announcement.instance_id, deserialized.instance_id);
}
#[tokio::test]
async fn test_service_discovery_creation() {
let config = ServiceDiscoveryConfig::default();
let instance_id = Uuid::new_v4();
let address = "127.0.0.1:8080".parse().unwrap();
let capabilities = vec!["text".to_string()];
let discovery = ServiceDiscovery::new(config, instance_id, address, capabilities);
assert_eq!(discovery.local_instance.instance_id, instance_id);
assert_eq!(discovery.local_instance.address, address);
}
#[test]
fn test_gossip_config_default() {
let config = GossipConfig::default();
assert_eq!(config.fanout, 3);
assert_eq!(config.gossip_interval, 200);
assert_eq!(config.suspect_timeout, 2000);
assert_eq!(config.dead_timeout, 5000);
}
#[test]
fn test_cluster_member_creation() {
let member = ClusterMember {
id: Uuid::new_v4(),
address: "127.0.0.1:8080".parse().unwrap(),
state: MemberState::Alive,
incarnation: 0,
metadata: HashMap::new(),
last_updated: current_timestamp(),
};
assert_eq!(member.state, MemberState::Alive);
assert_eq!(member.incarnation, 0);
}
#[test]
fn test_gossip_message_serialization() {
let member = ClusterMember {
id: Uuid::new_v4(),
address: "127.0.0.1:8080".parse().unwrap(),
state: MemberState::Alive,
incarnation: 1,
metadata: HashMap::new(),
last_updated: current_timestamp(),
};
let msg = GossipMessage::Join {
member: member.clone(),
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: GossipMessage = serde_json::from_str(&json).unwrap();
if let GossipMessage::Join { member: m } = deserialized {
assert_eq!(m.id, member.id);
assert_eq!(m.incarnation, 1);
} else {
panic!("Expected Join message");
}
}
#[tokio::test]
async fn test_gossip_cluster_creation() {
let id = Uuid::new_v4();
let address = "127.0.0.1:8080".parse().unwrap();
let config = GossipConfig::default();
let cluster = GossipCluster::new(id, address, config);
assert_eq!(cluster.local_member.id, id);
assert_eq!(cluster.local_member.address, address);
assert_eq!(cluster.local_member.state, MemberState::Alive);
}
#[tokio::test]
async fn test_gossip_cluster_get_members() {
let id = Uuid::new_v4();
let address = "127.0.0.1:8080".parse().unwrap();
let config = GossipConfig::default();
let cluster = GossipCluster::new(id, address, config);
let members = cluster.get_members().await;
assert!(members.is_empty());
}
#[test]
fn test_election_config_default() {
let config = ElectionConfig::default();
assert_eq!(config.election_timeout_min, 150);
assert_eq!(config.election_timeout_max, 300);
assert_eq!(config.heartbeat_interval, 50);
}
#[test]
fn test_election_state_enum() {
let state = ElectionState::Follower;
assert_eq!(state, ElectionState::Follower);
let json = serde_json::to_string(&state).unwrap();
let deserialized: ElectionState = serde_json::from_str(&json).unwrap();
assert_eq!(state, deserialized);
}
#[test]
fn test_election_message_serialization() {
let msg = ElectionMessage::RequestVote {
term: 1,
candidate_id: Uuid::new_v4(),
last_log_index: 0,
last_log_term: 0,
};
let json = serde_json::to_string(&msg).unwrap();
let deserialized: ElectionMessage = serde_json::from_str(&json).unwrap();
if let ElectionMessage::RequestVote { term, .. } = deserialized {
assert_eq!(term, 1);
} else {
panic!("Expected RequestVote message");
}
}
#[tokio::test]
async fn test_leader_election_creation() {
let node_id = Uuid::new_v4();
let config = ElectionConfig::default();
let election = LeaderElection::new(node_id, config);
assert!(!election.is_leader().await);
assert!(election.get_leader().await.is_none());
assert_eq!(election.get_state().await, ElectionState::Follower);
}
#[tokio::test]
async fn test_leader_election_set_peers() {
let node_id = Uuid::new_v4();
let config = ElectionConfig::default();
let election = LeaderElection::new(node_id, config);
let peers = vec![
"127.0.0.1:8001".parse().unwrap(),
"127.0.0.1:8002".parse().unwrap(),
];
election.set_peers(peers.clone()).await;
let stored_peers = election.peers.read().await;
assert_eq!(stored_peers.len(), 2);
}
#[test]
fn test_member_state_comparison() {
assert_eq!(MemberState::Alive, MemberState::Alive);
assert_ne!(MemberState::Alive, MemberState::Dead);
assert_ne!(MemberState::Suspect, MemberState::Left);
}
#[test]
fn test_discovery_protocol_variants() {
let multicast = DiscoveryProtocol::Multicast;
let static_peers = DiscoveryProtocol::StaticPeers(vec!["127.0.0.1:8080".parse().unwrap()]);
let registry = DiscoveryProtocol::Registry {
url: "http://localhost:8500".to_string(),
};
let _ = multicast;
let _ = static_peers;
let _ = registry;
}
#[tokio::test]
async fn test_cluster_event_broadcast() {
let id = Uuid::new_v4();
let address = "127.0.0.1:8080".parse().unwrap();
let config = GossipConfig::default();
let cluster = GossipCluster::new(id, address, config);
let mut receiver = cluster.subscribe();
assert!(receiver.try_recv().is_err()); }
#[test]
fn test_current_timestamp() {
let ts1 = current_timestamp();
std::thread::sleep(std::time::Duration::from_millis(10));
let ts2 = current_timestamp();
assert!(ts2 >= ts1);
}
}