use crate::consensus::ConsensusManager;
use crate::discovery::DiscoveryService;
use crate::error::{ClusterError, Result};
use crate::health_monitor::{HealthMonitor, NodeHealth};
use crate::raft::OxirsNodeId;
use crate::replication::ReplicationManager;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, timeout};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeState {
Starting,
Active,
Degraded,
Suspended,
Draining,
Leaving,
Left,
Failed,
Suspected,
}
impl NodeState {
pub fn is_operational(self) -> bool {
matches!(self, NodeState::Active | NodeState::Degraded)
}
pub fn is_consensus_eligible(self) -> bool {
matches!(
self,
NodeState::Active | NodeState::Degraded | NodeState::Draining
)
}
pub fn is_healthy(self) -> bool {
matches!(self, NodeState::Active)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LifecycleEvent {
NodeJoining {
node_id: OxirsNodeId,
address: SocketAddr,
metadata: NodeMetadata,
},
NodeJoined {
node_id: OxirsNodeId,
timestamp: u64,
},
NodeStateChanged {
node_id: OxirsNodeId,
old_state: NodeState,
new_state: NodeState,
reason: String,
timestamp: u64,
},
NodeLeaving {
node_id: OxirsNodeId,
reason: String,
graceful: bool,
},
NodeLeft {
node_id: OxirsNodeId,
timestamp: u64,
},
NodeEvicted {
node_id: OxirsNodeId,
reason: String,
timestamp: u64,
},
MembershipChanged {
added_nodes: Vec<OxirsNodeId>,
removed_nodes: Vec<OxirsNodeId>,
timestamp: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeMetadata {
pub capabilities: Vec<String>,
pub datacenter: Option<String>,
pub rack: Option<String>,
pub version: String,
pub tags: HashMap<String, String>,
pub capacity: ResourceCapacity,
}
impl Default for NodeMetadata {
fn default() -> Self {
Self {
capabilities: vec!["raft".to_string(), "rdf".to_string()],
datacenter: None,
rack: None,
version: env!("CARGO_PKG_VERSION").to_string(),
tags: HashMap::new(),
capacity: ResourceCapacity::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceCapacity {
pub cpu_cores: u32,
pub memory_bytes: u64,
pub disk_bytes: u64,
pub network_bandwidth: u64,
pub max_connections: u32,
}
impl Default for ResourceCapacity {
fn default() -> Self {
Self {
cpu_cores: num_cpus::get() as u32,
memory_bytes: 8 * 1024 * 1024 * 1024, disk_bytes: 100 * 1024 * 1024 * 1024, network_bandwidth: 1_000_000_000, max_connections: 10000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LifecycleConfig {
pub health_check_interval: Duration,
pub failure_timeout: Duration,
pub graceful_shutdown_timeout: Duration,
pub removal_grace_period: Duration,
pub enable_auto_failure_detection: bool,
pub enable_auto_recovery: bool,
pub min_cluster_size: usize,
pub join_timeout: Duration,
pub enable_byzantine_detection: bool,
}
impl Default for LifecycleConfig {
fn default() -> Self {
Self {
health_check_interval: Duration::from_secs(30),
failure_timeout: Duration::from_secs(120),
graceful_shutdown_timeout: Duration::from_secs(300),
removal_grace_period: Duration::from_secs(600),
enable_auto_failure_detection: true,
enable_auto_recovery: true,
min_cluster_size: 3,
join_timeout: Duration::from_secs(60),
enable_byzantine_detection: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
pub node_id: OxirsNodeId,
pub state: NodeState,
pub address: SocketAddr,
pub metadata: NodeMetadata,
pub last_seen: u64,
pub health: NodeHealth,
pub joined_at: u64,
pub state_history: Vec<(NodeState, u64, String)>,
pub performance_score: f64,
pub failure_count: u32,
}
impl NodeStatus {
pub fn new(node_id: OxirsNodeId, address: SocketAddr, metadata: NodeMetadata) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs();
Self {
node_id,
state: NodeState::Starting,
address,
metadata,
last_seen: now,
health: NodeHealth::default(),
joined_at: now,
state_history: vec![(NodeState::Starting, now, "Initial state".to_string())],
performance_score: 1.0,
failure_count: 0,
}
}
pub fn update_state(&mut self, new_state: NodeState, reason: String) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs();
self.state_history.push((new_state, now, reason));
if self.state_history.len() > 10 {
self.state_history.remove(0);
}
self.state = new_state;
self.last_seen = now;
if matches!(new_state, NodeState::Active) {
self.failure_count = 0;
}
}
pub fn update_health(&mut self, health: NodeHealth) {
self.health = health;
self.last_seen = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs();
self.performance_score = self.calculate_performance_score();
}
fn calculate_performance_score(&self) -> f64 {
let mut score: f64 = 1.0;
if self.health.system_metrics.cpu_usage > 0.8 {
score *= 0.7;
} else if self.health.system_metrics.cpu_usage > 0.6 {
score *= 0.9;
}
if self.health.system_metrics.memory_usage > 0.9 {
score *= 0.6;
} else if self.health.system_metrics.memory_usage > 0.7 {
score *= 0.8;
}
if self.health.response_time > Duration::from_millis(1000) {
score *= 0.7;
} else if self.health.response_time > Duration::from_millis(500) {
score *= 0.9;
}
if self.failure_count > 5 {
score *= 0.5;
} else if self.failure_count > 2 {
score *= 0.8;
}
score.clamp(0.0, 1.0)
}
pub fn is_stale(&self, timeout: Duration) -> bool {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs();
now.saturating_sub(self.last_seen) > timeout.as_secs()
}
pub fn record_failure(&mut self) {
self.failure_count += 1;
self.performance_score = self.calculate_performance_score();
}
}
pub struct NodeLifecycleManager {
local_node_id: OxirsNodeId,
config: LifecycleConfig,
consensus: Arc<ConsensusManager>,
discovery: Arc<DiscoveryService>,
health_monitor: Arc<HealthMonitor>,
replication: Arc<ReplicationManager>,
node_status: Arc<RwLock<HashMap<OxirsNodeId, NodeStatus>>>,
event_listeners: Arc<RwLock<Vec<tokio::sync::mpsc::UnboundedSender<LifecycleEvent>>>>,
task_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl NodeLifecycleManager {
pub fn new(
local_node_id: OxirsNodeId,
config: LifecycleConfig,
consensus: Arc<ConsensusManager>,
discovery: Arc<DiscoveryService>,
health_monitor: Arc<HealthMonitor>,
replication: Arc<ReplicationManager>,
) -> Self {
Self {
local_node_id,
config,
consensus,
discovery,
health_monitor,
replication,
node_status: Arc::new(RwLock::new(HashMap::new())),
event_listeners: Arc::new(RwLock::new(Vec::new())),
task_handle: Arc::new(Mutex::new(None)),
}
}
pub async fn start(&self) -> Result<()> {
info!(
"Starting node lifecycle manager for node {}",
self.local_node_id
);
let task_handle = {
let manager = self.clone();
tokio::spawn(async move {
manager.monitoring_loop().await;
})
};
let mut handle = self.task_handle.lock().await;
*handle = Some(task_handle);
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping node lifecycle manager");
let mut handle = self.task_handle.lock().await;
if let Some(task) = handle.take() {
task.abort();
}
Ok(())
}
pub async fn add_node(
&self,
node_id: OxirsNodeId,
address: SocketAddr,
metadata: NodeMetadata,
) -> Result<()> {
info!("Adding node {} at {} to cluster", node_id, address);
{
let nodes = self.node_status.read().await;
if nodes.contains_key(&node_id) {
return Err(ClusterError::Config(format!(
"Node {node_id} already exists"
)));
}
}
let node_status = NodeStatus::new(node_id, address, metadata.clone());
self.emit_event(LifecycleEvent::NodeJoining {
node_id,
address,
metadata,
})
.await;
let join_result = timeout(
self.config.join_timeout,
self.perform_node_join(node_id, address, node_status),
)
.await;
match join_result {
Ok(Ok(())) => {
info!("Successfully added node {} to cluster", node_id);
self.emit_event(LifecycleEvent::NodeJoined {
node_id,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs(),
})
.await;
Ok(())
}
Ok(Err(e)) => {
error!("Failed to add node {}: {}", node_id, e);
self.cleanup_failed_join(node_id).await;
Err(e)
}
Err(_) => {
error!("Node join timed out for node {}", node_id);
self.cleanup_failed_join(node_id).await;
Err(ClusterError::Other("Node join timed out".to_string()))
}
}
}
async fn perform_node_join(
&self,
node_id: OxirsNodeId,
address: SocketAddr,
mut node_status: NodeStatus,
) -> Result<()> {
node_status.update_state(NodeState::Active, "Successfully joined cluster".to_string());
{
let mut nodes = self.node_status.write().await;
nodes.insert(node_id, node_status);
}
self.health_monitor
.start_monitoring(node_id, address.to_string())
.await;
Ok(())
}
async fn cleanup_failed_join(&self, node_id: OxirsNodeId) {
warn!("Cleaning up failed join for node {}", node_id);
let mut nodes = self.node_status.write().await;
nodes.remove(&node_id);
}
pub async fn remove_node(&self, node_id: OxirsNodeId, graceful: bool) -> Result<()> {
info!(
"Removing node {} from cluster (graceful: {})",
node_id, graceful
);
let node_exists = {
let nodes = self.node_status.read().await;
nodes.contains_key(&node_id)
};
if !node_exists {
return Err(ClusterError::Config(format!("Node {node_id} not found")));
}
self.emit_event(LifecycleEvent::NodeLeaving {
node_id,
reason: if graceful {
"Graceful shutdown"
} else {
"Forced removal"
}
.to_string(),
graceful,
})
.await;
if graceful {
self.perform_graceful_removal(node_id).await?;
} else {
self.perform_immediate_removal(node_id).await?;
}
self.emit_event(LifecycleEvent::NodeLeft {
node_id,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs(),
})
.await;
info!("Successfully removed node {} from cluster", node_id);
Ok(())
}
async fn perform_graceful_removal(&self, node_id: OxirsNodeId) -> Result<()> {
{
let mut nodes = self.node_status.write().await;
if let Some(node_status) = nodes.get_mut(&node_id) {
node_status.update_state(
NodeState::Draining,
"Graceful removal initiated".to_string(),
);
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
self.perform_immediate_removal(node_id).await
}
async fn perform_immediate_removal(&self, node_id: OxirsNodeId) -> Result<()> {
self.health_monitor.stop_monitoring(node_id).await;
{
let mut nodes = self.node_status.write().await;
if let Some(node_status) = nodes.get_mut(&node_id) {
node_status.update_state(NodeState::Left, "Removed from cluster".to_string());
}
}
let nodes = Arc::clone(&self.node_status);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(300)).await; let mut nodes = nodes.write().await;
nodes.remove(&node_id);
});
Ok(())
}
pub async fn force_evict_node(&self, node_id: OxirsNodeId, reason: String) -> Result<()> {
warn!("Force evicting node {}: {}", node_id, reason);
{
let mut nodes = self.node_status.write().await;
if let Some(node_status) = nodes.get_mut(&node_id) {
node_status.update_state(NodeState::Failed, reason.clone());
}
}
self.emit_event(LifecycleEvent::NodeEvicted {
node_id,
reason: reason.clone(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs(),
})
.await;
self.perform_immediate_removal(node_id).await?;
info!("Successfully evicted node {}", node_id);
Ok(())
}
pub async fn get_node_status(&self, node_id: OxirsNodeId) -> Option<NodeStatus> {
let nodes = self.node_status.read().await;
nodes.get(&node_id).cloned()
}
pub async fn get_all_node_statuses(&self) -> HashMap<OxirsNodeId, NodeStatus> {
let nodes = self.node_status.read().await;
nodes.clone()
}
pub async fn get_healthy_nodes(&self) -> Vec<OxirsNodeId> {
let nodes = self.node_status.read().await;
nodes
.values()
.filter(|status| status.state.is_healthy())
.map(|status| status.node_id)
.collect()
}
pub async fn get_operational_nodes(&self) -> Vec<OxirsNodeId> {
let nodes = self.node_status.read().await;
nodes
.values()
.filter(|status| status.state.is_operational())
.map(|status| status.node_id)
.collect()
}
pub async fn check_cluster_health(&self) -> ClusterHealthStatus {
let nodes = self.node_status.read().await;
let total_nodes = nodes.len();
let healthy_nodes = nodes.values().filter(|s| s.state.is_healthy()).count();
let operational_nodes = nodes.values().filter(|s| s.state.is_operational()).count();
let failed_nodes = nodes
.values()
.filter(|s| matches!(s.state, NodeState::Failed))
.count();
let health_ratio = if total_nodes > 0 {
healthy_nodes as f64 / total_nodes as f64
} else {
1.0
};
let status = if health_ratio >= 0.8 {
ClusterHealth::Healthy
} else if health_ratio >= 0.6 {
ClusterHealth::Degraded
} else if operational_nodes >= self.config.min_cluster_size {
ClusterHealth::Unstable
} else {
ClusterHealth::Critical
};
ClusterHealthStatus {
status,
total_nodes,
healthy_nodes,
operational_nodes,
failed_nodes,
health_ratio,
min_cluster_size: self.config.min_cluster_size,
}
}
async fn monitoring_loop(&self) {
let mut interval = interval(self.config.health_check_interval);
loop {
interval.tick().await;
if let Err(e) = self.perform_health_check().await {
error!("Health check failed: {}", e);
}
if self.config.enable_auto_failure_detection {
if let Err(e) = self.detect_and_handle_failures().await {
error!("Failure detection failed: {}", e);
}
}
if self.config.enable_byzantine_detection {
if let Err(e) = self.detect_byzantine_behavior().await {
error!("Byzantine detection failed: {}", e);
}
}
}
}
async fn perform_health_check(&self) -> Result<()> {
debug!("Performing health check on all nodes");
let node_ids: Vec<OxirsNodeId> = {
let nodes = self.node_status.read().await;
nodes.keys().copied().collect()
};
for node_id in node_ids {
if let Some(health_status) = self.health_monitor.get_node_health(node_id).await {
let mut nodes = self.node_status.write().await;
if let Some(node_status) = nodes.get_mut(&node_id) {
node_status.update_health(health_status.health);
let new_state = if node_status.health.status
== crate::health_monitor::NodeHealthLevel::Healthy
{
NodeState::Active
} else if node_status.health.system_metrics.cpu_usage > 0.9
|| node_status.health.system_metrics.memory_usage > 0.95
{
NodeState::Degraded
} else {
node_status.state };
if new_state != node_status.state {
let old_state = node_status.state;
node_status
.update_state(new_state, "Health check state change".to_string());
self.emit_event(LifecycleEvent::NodeStateChanged {
node_id,
old_state,
new_state,
reason: "Health check state change".to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs(),
})
.await;
}
}
}
}
Ok(())
}
async fn detect_and_handle_failures(&self) -> Result<()> {
debug!("Detecting node failures");
let failed_nodes: Vec<OxirsNodeId> = {
let nodes = self.node_status.read().await;
nodes
.values()
.filter(|status| {
status.is_stale(self.config.failure_timeout)
&& !matches!(status.state, NodeState::Failed | NodeState::Left)
})
.map(|status| status.node_id)
.collect()
};
for node_id in failed_nodes {
warn!("Detected failed node: {}", node_id);
{
let mut nodes = self.node_status.write().await;
if let Some(node_status) = nodes.get_mut(&node_id) {
let old_state = node_status.state;
node_status
.update_state(NodeState::Failed, "Node failure detected".to_string());
node_status.record_failure();
self.emit_event(LifecycleEvent::NodeStateChanged {
node_id,
old_state,
new_state: NodeState::Failed,
reason: "Node failure detected".to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs(),
})
.await;
}
}
if self.config.enable_auto_recovery {
let manager = self.clone();
tokio::spawn(async move {
tokio::time::sleep(manager.config.removal_grace_period).await;
if let Some(status) = manager.get_node_status(node_id).await {
if matches!(status.state, NodeState::Failed) {
if let Err(e) = manager
.force_evict_node(node_id, "Auto-recovery eviction".to_string())
.await
{
error!("Failed to auto-evict node {}: {}", node_id, e);
}
}
}
});
}
}
Ok(())
}
async fn detect_byzantine_behavior(&self) -> Result<()> {
debug!("Checking for Byzantine behavior");
let suspected_nodes: Vec<OxirsNodeId> = {
let nodes = self.node_status.read().await;
nodes
.values()
.filter(|status| {
status.failure_count > 10 || status.performance_score < 0.2 })
.map(|status| status.node_id)
.collect()
};
for node_id in suspected_nodes {
warn!("Suspected Byzantine behavior from node: {}", node_id);
{
let mut nodes = self.node_status.write().await;
if let Some(node_status) = nodes.get_mut(&node_id) {
let old_state = node_status.state;
node_status.update_state(
NodeState::Suspected,
"Byzantine behavior detected".to_string(),
);
self.emit_event(LifecycleEvent::NodeStateChanged {
node_id,
old_state,
new_state: NodeState::Suspected,
reason: "Byzantine behavior detected".to_string(),
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs(),
})
.await;
}
}
}
Ok(())
}
async fn emit_event(&self, event: LifecycleEvent) {
let listeners = self.event_listeners.read().await;
for listener in listeners.iter() {
if listener.send(event.clone()).is_err() {
}
}
}
pub async fn subscribe_events(&self) -> tokio::sync::mpsc::UnboundedReceiver<LifecycleEvent> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let mut listeners = self.event_listeners.write().await;
listeners.push(sender);
receiver
}
pub async fn transfer_leadership(&self, target_node: Option<OxirsNodeId>) -> Result<()> {
info!("Transferring leadership before shutdown");
if let Some(target) = target_node {
self.consensus
.transfer_leadership(target)
.await
.map_err(|e| ClusterError::Other(format!("Leadership transfer failed: {e}")))?;
} else {
let healthy_nodes = self.get_healthy_nodes().await;
if let Some(&target) = healthy_nodes.first() {
if target != self.local_node_id {
self.consensus
.transfer_leadership(target)
.await
.map_err(|e| {
ClusterError::Other(format!("Leadership transfer failed: {e}"))
})?;
}
}
}
info!("Leadership transfer completed");
Ok(())
}
pub async fn graceful_shutdown(&self) -> Result<()> {
info!("Initiating graceful shutdown of local node");
if self.consensus.is_leader().await {
self.transfer_leadership(None).await?;
}
self.remove_node(self.local_node_id, true).await?;
self.stop().await?;
info!("Graceful shutdown completed");
Ok(())
}
}
impl Clone for NodeLifecycleManager {
fn clone(&self) -> Self {
Self {
local_node_id: self.local_node_id,
config: self.config.clone(),
consensus: Arc::clone(&self.consensus),
discovery: Arc::clone(&self.discovery),
health_monitor: Arc::clone(&self.health_monitor),
replication: Arc::clone(&self.replication),
node_status: Arc::clone(&self.node_status),
event_listeners: Arc::clone(&self.event_listeners),
task_handle: Arc::clone(&self.task_handle),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterHealthStatus {
pub status: ClusterHealth,
pub total_nodes: usize,
pub healthy_nodes: usize,
pub operational_nodes: usize,
pub failed_nodes: usize,
pub health_ratio: f64,
pub min_cluster_size: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ClusterHealth {
Healthy,
Degraded,
Unstable,
Critical,
}
impl ClusterHealth {
pub fn is_operational(self) -> bool {
!matches!(self, ClusterHealth::Critical)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn test_node_state_properties() {
assert!(NodeState::Active.is_operational());
assert!(NodeState::Active.is_consensus_eligible());
assert!(NodeState::Active.is_healthy());
assert!(NodeState::Degraded.is_operational());
assert!(NodeState::Degraded.is_consensus_eligible());
assert!(!NodeState::Degraded.is_healthy());
assert!(!NodeState::Failed.is_operational());
assert!(!NodeState::Failed.is_consensus_eligible());
assert!(!NodeState::Failed.is_healthy());
}
#[test]
fn test_node_status_performance_scoring() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let metadata = NodeMetadata::default();
let mut status = NodeStatus::new(1, addr, metadata);
assert!(status.performance_score > 0.8);
status.record_failure();
status.record_failure();
status.record_failure();
assert!(status.performance_score < 0.9);
for _ in 0..10 {
status.record_failure();
}
assert!(status.performance_score < 0.6);
}
#[test]
fn test_node_status_state_history() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let metadata = NodeMetadata::default();
let mut status = NodeStatus::new(1, addr, metadata);
assert_eq!(status.state, NodeState::Starting);
assert_eq!(status.state_history.len(), 1);
status.update_state(NodeState::Active, "Joined cluster".to_string());
status.update_state(NodeState::Degraded, "High CPU usage".to_string());
status.update_state(NodeState::Active, "CPU usage normalized".to_string());
assert_eq!(status.state, NodeState::Active);
assert_eq!(status.state_history.len(), 4);
assert_eq!(status.state_history.last().unwrap().0, NodeState::Active);
}
#[test]
fn test_node_status_staleness() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let metadata = NodeMetadata::default();
let status = NodeStatus::new(1, addr, metadata);
assert!(!status.is_stale(Duration::from_secs(60)));
let mut old_status = status;
old_status.last_seen = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_secs()
- 120;
assert!(old_status.is_stale(Duration::from_secs(60)));
}
#[test]
fn test_resource_capacity_defaults() {
let capacity = ResourceCapacity::default();
assert!(capacity.cpu_cores > 0);
assert!(capacity.memory_bytes > 0);
assert!(capacity.disk_bytes > 0);
assert!(capacity.network_bandwidth > 0);
assert!(capacity.max_connections > 0);
}
#[test]
fn test_cluster_health_operational() {
assert!(ClusterHealth::Healthy.is_operational());
assert!(ClusterHealth::Degraded.is_operational());
assert!(ClusterHealth::Unstable.is_operational());
assert!(!ClusterHealth::Critical.is_operational());
}
}