use std::net::SocketAddr;
use std::path::PathBuf;
use config::ConfigError;
use d_engine_proto::common::NodeRole::Follower;
use d_engine_proto::common::NodeStatus;
use d_engine_proto::server::cluster::NodeMeta;
use serde::Deserialize;
use serde::Serialize;
#[cfg(debug_assertions)]
use tracing::warn;
use super::validate_directory;
use crate::Error;
use crate::Result;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ClusterConfig {
#[serde(default = "default_node_id")]
pub node_id: u32,
#[serde(default = "default_listen_addr")]
pub listen_address: SocketAddr,
#[serde(default = "default_initial_cluster")]
pub initial_cluster: Vec<NodeMeta>,
#[serde(default = "default_db_dir")]
pub db_root_dir: PathBuf,
#[serde(default = "default_log_dir")]
pub log_dir: PathBuf,
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
node_id: default_node_id(),
listen_address: default_listen_addr(),
initial_cluster: vec![],
db_root_dir: default_db_dir(),
log_dir: default_log_dir(),
}
}
}
impl ClusterConfig {
pub fn validate(&self) -> Result<()> {
if self.node_id == 0 {
return Err(Error::Config(ConfigError::Message(
"node_id cannot be 0 (reserved for invalid nodes)".into(),
)));
}
if self.initial_cluster.is_empty() {
return Err(Error::Config(ConfigError::Message(
"initial_cluster must contain at least one node".into(),
)));
}
let self_in_cluster = self.initial_cluster.iter().any(|n| n.id == self.node_id);
if !self_in_cluster {
return Err(Error::Config(ConfigError::Message(format!(
"Current node {} not found in initial_cluster",
self.node_id
))));
}
let mut ids = std::collections::HashSet::new();
for node in &self.initial_cluster {
if !ids.insert(node.id) {
return Err(Error::Config(ConfigError::Message(format!(
"Duplicate node_id {} in initial_cluster",
node.id
))));
}
}
if self.listen_address.port() == 0 {
return Err(Error::Config(ConfigError::Message(
"listen_address must specify a non-zero port".into(),
)));
}
if self.db_root_dir == PathBuf::from("/tmp/db") {
#[cfg(not(debug_assertions))]
{
return Err(Error::Config(ConfigError::Message(
"db_root_dir not configured. Using /tmp/db is not allowed in release builds. \
Please set CONFIG_PATH environment variable or configure [cluster.db_root_dir] \
in your config file.".into()
)));
}
#[cfg(debug_assertions)]
{
warn!(
"⚠️ Using default /tmp/db (data will be lost on reboot). \
Set CONFIG_PATH or configure [cluster.db_root_dir] for production."
);
}
}
validate_directory(&self.db_root_dir, "db_root_dir")?;
validate_directory(&self.log_dir, "log_dir")?;
Ok(())
}
}
fn default_node_id() -> u32 {
1
}
fn default_initial_cluster() -> Vec<NodeMeta> {
vec![NodeMeta {
id: 1,
address: "127.0.0.1:8080".to_string(),
role: Follower as i32,
status: NodeStatus::Active.into(),
}]
}
fn default_listen_addr() -> SocketAddr {
"127.0.0.1:9081".parse().unwrap()
}
fn default_db_dir() -> PathBuf {
PathBuf::from("/tmp/db")
}
fn default_log_dir() -> PathBuf {
PathBuf::from("/tmp/logs")
}