use crate::error::NetResult;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeState {
Initializing,
Active,
Degraded,
Unhealthy,
ShuttingDown,
Offline,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeRole {
Primary,
Secondary,
Edge,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub id: Uuid,
pub name: String,
pub address: SocketAddr,
pub role: NodeRole,
pub state: NodeState,
pub region: String,
pub stream_count: usize,
pub viewer_count: usize,
pub cpu_usage: f64,
pub memory_usage: f64,
pub bandwidth_usage: u64,
pub last_heartbeat: DateTime<Utc>,
pub start_time: DateTime<Utc>,
pub version: String,
}
impl NodeInfo {
#[must_use]
pub fn new(name: impl Into<String>, address: SocketAddr, region: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
name: name.into(),
address,
role: NodeRole::Secondary,
state: NodeState::Initializing,
region: region.into(),
stream_count: 0,
viewer_count: 0,
cpu_usage: 0.0,
memory_usage: 0.0,
bandwidth_usage: 0,
last_heartbeat: Utc::now(),
start_time: Utc::now(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
#[must_use]
pub fn is_healthy(&self) -> bool {
self.state == NodeState::Active || self.state == NodeState::Degraded
}
#[must_use]
pub fn has_recent_heartbeat(&self, timeout: Duration) -> bool {
if let Ok(elapsed) = (Utc::now() - self.last_heartbeat).to_std() {
elapsed < timeout
} else {
false
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterConfig {
pub node_name: String,
pub node_address: SocketAddr,
pub region: String,
pub seed_nodes: Vec<SocketAddr>,
pub heartbeat_interval: Duration,
pub heartbeat_timeout: Duration,
pub enable_replication: bool,
pub replication_factor: usize,
pub enable_failover: bool,
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
node_name: hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.unwrap_or_else(|| "unknown".to_string()),
node_address: "0.0.0.0:8080".parse().expect("valid address"),
region: "default".to_string(),
seed_nodes: Vec::new(),
heartbeat_interval: Duration::from_secs(5),
heartbeat_timeout: Duration::from_secs(15),
enable_replication: true,
replication_factor: 2,
enable_failover: true,
}
}
}
pub struct ClusterNode {
local_info: RwLock<NodeInfo>,
config: ClusterConfig,
nodes: RwLock<HashMap<Uuid, NodeInfo>>,
stream_assignments: RwLock<HashMap<Uuid, Vec<Uuid>>>,
}
impl ClusterNode {
pub async fn new(config: ClusterConfig) -> NetResult<Self> {
let local_info = NodeInfo::new(&config.node_name, config.node_address, &config.region);
Ok(Self {
local_info: RwLock::new(local_info),
config,
nodes: RwLock::new(HashMap::new()),
stream_assignments: RwLock::new(HashMap::new()),
})
}
pub async fn start(&self) -> NetResult<()> {
{
let mut info = self.local_info.write();
info.state = NodeState::Active;
}
self.start_heartbeat_task();
self.start_health_check_task();
self.discover_nodes().await?;
Ok(())
}
async fn discover_nodes(&self) -> NetResult<()> {
for seed_addr in &self.config.seed_nodes {
let node_info =
NodeInfo::new(format!("node-{seed_addr}"), *seed_addr, &self.config.region);
let mut nodes = self.nodes.write();
nodes.insert(node_info.id, node_info);
}
Ok(())
}
fn start_heartbeat_task(&self) {
let interval = self.config.heartbeat_interval;
let local_info_data = self.local_info.read().clone();
let local_info = Arc::new(RwLock::new(local_info_data));
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
{
let mut info = local_info.write();
info.last_heartbeat = Utc::now();
}
}
});
}
fn start_health_check_task(&self) {
let timeout = self.config.heartbeat_timeout;
let nodes_map: HashMap<Uuid, NodeInfo> = self.nodes.read().clone();
let nodes = Arc::new(RwLock::new(nodes_map));
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let mut nodes = nodes.write();
for (_, node) in nodes.iter_mut() {
if !node.has_recent_heartbeat(timeout) {
node.state = NodeState::Unhealthy;
}
}
}
});
}
pub fn assign_stream(&self, stream_id: Uuid) -> Vec<Uuid> {
let nodes = self.nodes.read();
let healthy_nodes: Vec<_> = nodes.values().filter(|n| n.is_healthy()).collect();
if healthy_nodes.is_empty() {
return vec![self.local_info.read().id];
}
let count = self.config.replication_factor.min(healthy_nodes.len());
let assigned: Vec<Uuid> = healthy_nodes.iter().take(count).map(|n| n.id).collect();
{
let mut assignments = self.stream_assignments.write();
assignments.insert(stream_id, assigned.clone());
}
assigned
}
#[must_use]
pub fn get_stream_nodes(&self, stream_id: Uuid) -> Option<Vec<Uuid>> {
let assignments = self.stream_assignments.read();
assignments.get(&stream_id).cloned()
}
pub fn update_metrics(&self, stream_count: usize, viewer_count: usize) {
let mut info = self.local_info.write();
info.stream_count = stream_count;
info.viewer_count = viewer_count;
info.cpu_usage = 0.0;
info.memory_usage = 0.0;
}
#[must_use]
pub fn local_info(&self) -> NodeInfo {
self.local_info.read().clone()
}
#[must_use]
pub fn cluster_nodes(&self) -> Vec<NodeInfo> {
let nodes = self.nodes.read();
nodes.values().cloned().collect()
}
#[must_use]
pub fn healthy_nodes(&self) -> Vec<NodeInfo> {
let nodes = self.nodes.read();
nodes.values().filter(|n| n.is_healthy()).cloned().collect()
}
#[must_use]
pub fn select_node_for_viewer(&self, viewer_region: Option<&str>) -> Option<NodeInfo> {
let nodes = self.nodes.read();
let healthy: Vec<_> = nodes.values().filter(|n| n.is_healthy()).collect();
if healthy.is_empty() {
return Some(self.local_info.read().clone());
}
if let Some(region) = viewer_region {
if let Some(node) = healthy.iter().find(|n| n.region == region) {
return Some((*node).clone());
}
}
healthy
.iter()
.min_by_key(|n| n.viewer_count)
.map(|n| (*n).clone())
}
pub async fn shutdown(&self) -> NetResult<()> {
let mut info = self.local_info.write();
info.state = NodeState::ShuttingDown;
info.state = NodeState::Offline;
Ok(())
}
}
mod hostname {
use std::ffi::OsString;
use std::io;
#[must_use]
pub fn get() -> io::Result<OsString> {
Ok(OsString::from("localhost"))
}
}