pub mod byzantine_raft;
pub mod coordinator;
pub mod node;
pub mod partition;
pub mod raft;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{error::FusekiResult, store::Store};
#[derive(Debug, Clone)]
pub struct ClusterConfig {
pub node_id: String,
pub bind_addr: SocketAddr,
pub seeds: Vec<String>,
pub raft: RaftConfig,
pub partitioning: PartitionConfig,
pub replication: ReplicationConfig,
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
node_id: Uuid::new_v4().to_string(),
bind_addr: "0.0.0.0:7000"
.parse()
.expect("default bind address should be valid"),
seeds: Vec::new(),
raft: RaftConfig::default(),
partitioning: PartitionConfig::default(),
replication: ReplicationConfig::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct RaftConfig {
pub election_timeout: (Duration, Duration),
pub heartbeat_interval: Duration,
pub max_append_entries: usize,
pub snapshot_threshold: u64,
pub pre_vote: bool,
}
impl Default for RaftConfig {
fn default() -> Self {
Self {
election_timeout: (Duration::from_millis(150), Duration::from_millis(300)),
heartbeat_interval: Duration::from_millis(50),
max_append_entries: 100,
snapshot_threshold: 10000,
pre_vote: true,
}
}
}
#[derive(Debug, Clone)]
pub struct PartitionConfig {
pub strategy: PartitionStrategy,
pub partition_count: u32,
pub vnodes: u32,
pub rebalancing: RebalancingConfig,
}
impl Default for PartitionConfig {
fn default() -> Self {
Self {
strategy: PartitionStrategy::ConsistentHashing,
partition_count: 128,
vnodes: 100,
rebalancing: RebalancingConfig::default(),
}
}
}
#[derive(Debug, Clone)]
pub enum PartitionStrategy {
ConsistentHashing,
Range,
Custom,
}
#[derive(Debug, Clone)]
pub struct RebalancingConfig {
pub enabled: bool,
pub threshold: f64,
pub max_concurrent_moves: usize,
pub check_interval: Duration,
}
impl Default for RebalancingConfig {
fn default() -> Self {
Self {
enabled: true,
threshold: 0.2, max_concurrent_moves: 3,
check_interval: Duration::from_secs(300), }
}
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub factor: usize,
pub write_consistency: ConsistencyLevel,
pub read_consistency: ConsistencyLevel,
pub read_repair: bool,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
factor: 3,
write_consistency: ConsistencyLevel::Quorum,
read_consistency: ConsistencyLevel::One,
read_repair: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum ConsistencyLevel {
One,
Quorum,
All,
LocalQuorum,
EachQuorum,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub id: String,
pub addr: SocketAddr,
pub state: NodeState,
pub metadata: NodeMetadata,
pub last_heartbeat: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum NodeState {
Joining,
Active,
Leaving,
Down,
Decommissioned,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeMetadata {
pub datacenter: Option<String>,
pub rack: Option<String>,
pub capacity: u64,
pub load: f64,
pub version: String,
}
#[derive(Debug, Clone)]
pub struct ClusterView {
pub members: HashMap<String, NodeInfo>,
pub leader: Option<String>,
pub version: u64,
pub updated_at: i64,
}
pub struct ClusterManager {
config: ClusterConfig,
node_info: NodeInfo,
raft_node: Arc<raft::RaftNode>,
partition_manager: Arc<partition::PartitionManager>,
#[allow(dead_code)]
coordinator: Arc<coordinator::QueryCoordinator>,
cluster_view: Arc<RwLock<ClusterView>>,
}
impl ClusterManager {
pub async fn new(config: ClusterConfig, store: Arc<Store>) -> FusekiResult<Self> {
let node_info = NodeInfo {
id: config.node_id.clone(),
addr: config.bind_addr,
state: NodeState::Joining,
metadata: NodeMetadata {
datacenter: None,
rack: None,
capacity: 1000, load: 0.0,
version: env!("CARGO_PKG_VERSION").to_string(),
},
last_heartbeat: chrono::Utc::now().timestamp_millis(),
};
let raft_node = Arc::new(
raft::RaftNode::new(config.node_id.clone(), config.raft.clone(), store.clone()).await?,
);
let partition_manager = Arc::new(partition::PartitionManager::new(
config.partitioning.clone(),
store.clone(),
));
let coordinator = Arc::new(coordinator::QueryCoordinator::new(
config.replication.clone(),
store.clone(),
));
let cluster_view = Arc::new(RwLock::new(ClusterView {
members: HashMap::new(),
leader: None,
version: 0,
updated_at: chrono::Utc::now().timestamp_millis(),
}));
Ok(Self {
config,
node_info,
raft_node,
partition_manager,
coordinator,
cluster_view,
})
}
pub async fn start(&self) -> FusekiResult<()> {
self.raft_node.start().await?;
if !self.config.seeds.is_empty() {
self.join_cluster().await?;
} else {
self.bootstrap_cluster().await?;
}
self.partition_manager.start().await?;
self.start_maintenance_tasks().await;
Ok(())
}
async fn join_cluster(&self) -> FusekiResult<()> {
tracing::info!("Joining cluster with seeds: {:?}", self.config.seeds);
for seed in &self.config.seeds {
if let Ok(()) = self.contact_seed(seed).await {
break;
}
}
self.update_node_state(NodeState::Active).await;
Ok(())
}
async fn bootstrap_cluster(&self) -> FusekiResult<()> {
tracing::info!("Bootstrapping new cluster");
self.raft_node.bootstrap().await?;
let mut view = self.cluster_view.write().await;
view.members
.insert(self.node_info.id.clone(), self.node_info.clone());
view.leader = Some(self.node_info.id.clone());
view.version = 1;
self.update_node_state(NodeState::Active).await;
Ok(())
}
async fn contact_seed(&self, _seed: &str) -> FusekiResult<()> {
Ok(())
}
async fn update_node_state(&self, state: NodeState) {
let mut view = self.cluster_view.write().await;
if let Some(node) = view.members.get_mut(&self.node_info.id) {
node.state = state;
}
}
async fn start_maintenance_tasks(&self) {
let cluster_view = self.cluster_view.clone();
let node_id = self.node_info.id.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let mut view = cluster_view.write().await;
if let Some(node) = view.members.get_mut(&node_id) {
node.last_heartbeat = chrono::Utc::now().timestamp_millis();
}
}
});
let cluster_view = self.cluster_view.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let now = chrono::Utc::now().timestamp_millis();
let mut view = cluster_view.write().await;
for (_, node) in view.members.iter_mut() {
if node.state == NodeState::Active {
let elapsed = now - node.last_heartbeat;
if elapsed > 30000 {
node.state = NodeState::Down;
tracing::warn!("Node {} marked as down", node.id);
}
}
}
}
});
if self.config.partitioning.rebalancing.enabled {
let partition_manager = self.partition_manager.clone();
let interval = self.config.partitioning.rebalancing.check_interval;
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
if let Err(e) = partition_manager.check_rebalancing().await {
tracing::error!("Rebalancing check failed: {}", e);
}
}
});
}
}
pub async fn get_cluster_view(&self) -> ClusterView {
self.cluster_view.read().await.clone()
}
pub async fn get_health(&self) -> ClusterHealth {
let view = self.cluster_view.read().await;
let total_nodes = view.members.len();
let active_nodes = view
.members
.values()
.filter(|n| n.state == NodeState::Active)
.count();
let down_nodes = view
.members
.values()
.filter(|n| n.state == NodeState::Down)
.count();
ClusterHealth {
status: if down_nodes == 0 {
HealthStatus::Green
} else if active_nodes >= self.config.replication.factor {
HealthStatus::Yellow
} else {
HealthStatus::Red
},
total_nodes,
active_nodes,
down_nodes,
has_leader: view.leader.is_some(),
partition_count: self.config.partitioning.partition_count as usize,
under_replicated_partitions: 0, }
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ClusterHealth {
pub status: HealthStatus,
pub total_nodes: usize,
pub active_nodes: usize,
pub down_nodes: usize,
pub has_leader: bool,
pub partition_count: usize,
pub under_replicated_partitions: usize,
}
#[derive(Debug, Clone, Copy, Serialize)]
pub enum HealthStatus {
Green,
Yellow,
Red,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cluster_config_default() {
let config = ClusterConfig::default();
assert!(!config.node_id.is_empty());
assert!(config.raft.pre_vote);
assert_eq!(config.partitioning.partition_count, 128);
assert_eq!(config.replication.factor, 3);
}
#[test]
fn test_consistency_levels() {
let quorum = ConsistencyLevel::Quorum;
assert_eq!(quorum, ConsistencyLevel::Quorum);
}
}