d_engine/config/
cluster.rs1use std::net::SocketAddr;
2use std::path::PathBuf;
3
4use serde::Deserialize;
5use serde::Serialize;
6
7use crate::proto::NodeMeta;
8use crate::Error;
9use crate::Result;
10use crate::FOLLOWER;
11
12#[derive(Debug, Serialize, Deserialize, Clone)]
22pub struct ClusterConfig {
23 #[serde(default = "default_node_id")]
27 pub node_id: u32,
28
29 #[serde(default = "default_listen_addr")]
33 pub listen_address: SocketAddr,
34
35 #[serde(default = "default_initial_cluster")]
42 pub initial_cluster: Vec<NodeMeta>,
43
44 #[serde(default = "default_db_dir")]
48 pub db_root_dir: PathBuf,
49
50 #[serde(default = "default_log_dir")]
54 pub log_dir: PathBuf,
55}
56impl Default for ClusterConfig {
57 fn default() -> Self {
58 Self {
59 node_id: default_node_id(),
60 listen_address: default_listen_addr(),
61 initial_cluster: vec![],
62 db_root_dir: default_db_dir(),
63 log_dir: default_log_dir(),
64 }
65 }
66}
67
68impl ClusterConfig {
69 pub fn validate(&self) -> Result<()> {
73 if self.node_id == 0 {
75 return Err(Error::InvalidConfig(
76 "node_id cannot be 0 (reserved for invalid nodes)".into(),
77 ));
78 }
79
80 if self.initial_cluster.is_empty() {
82 return Err(Error::InvalidConfig(
83 "initial_cluster must contain at least one node".into(),
84 ));
85 }
86
87 let self_in_cluster = self.initial_cluster.iter().any(|n| n.id == self.node_id);
89 if !self_in_cluster {
90 return Err(Error::InvalidConfig(format!(
91 "Current node {} not found in initial_cluster",
92 self.node_id
93 )));
94 }
95
96 let mut ids = std::collections::HashSet::new();
98 for node in &self.initial_cluster {
99 if !ids.insert(node.id) {
100 return Err(Error::InvalidConfig(format!(
101 "Duplicate node_id {} in initial_cluster",
102 node.id
103 )));
104 }
105 }
106
107 if self.listen_address.port() == 0 {
109 return Err(Error::InvalidConfig(
110 "listen_address must specify a non-zero port".into(),
111 ));
112 }
113
114 self.validate_directory(&self.db_root_dir, "db_root_dir")?;
116 self.validate_directory(&self.log_dir, "log_dir")?;
117
118 Ok(())
119 }
120
121 fn validate_directory(
123 &self,
124 path: &PathBuf,
125 name: &str,
126 ) -> Result<()> {
127 if path.as_os_str().is_empty() {
128 return Err(Error::InvalidConfig(format!("{} path cannot be empty", name)));
129 }
130
131 #[cfg(not(test))]
132 {
133 use std::fs;
134 if !path.exists() {
136 fs::create_dir_all(path).map_err(|e| {
137 Error::InvalidConfig(format!(
138 "Failed to create {} directory at {}: {}",
139 name,
140 path.display(),
141 e
142 ))
143 })?;
144 }
145
146 let test_file = path.join(".permission_test");
148 fs::write(&test_file, b"test").map_err(|e| {
149 Error::InvalidConfig(format!(
150 "No write permission in {} directory {}: {}",
151 name,
152 path.display(),
153 e
154 ))
155 })?;
156 fs::remove_file(&test_file).ok();
157 }
158
159 Ok(())
160 }
161}
162
163fn default_node_id() -> u32 {
164 1
165}
166fn default_initial_cluster() -> Vec<NodeMeta> {
167 vec![NodeMeta {
168 id: 1,
169 ip: "127.0.0.1".to_string(),
170 port: 8080,
171 role: FOLLOWER,
172 }]
173}
174fn default_listen_addr() -> SocketAddr {
175 "127.0.0.1:9081".parse().unwrap()
176}
177fn default_db_dir() -> PathBuf {
178 PathBuf::from("/tmp/db")
179}
180fn default_log_dir() -> PathBuf {
181 PathBuf::from("/tmp/logs")
182}