rocketmq_controller/
config.rs1use std::net::SocketAddr;
19use std::path::PathBuf;
20
21use serde::Deserialize;
22use serde::Serialize;
23
24use crate::error::ControllerError;
25use crate::error::Result;
26use crate::storage::StorageConfig;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct RaftPeer {
31 pub id: u64,
33
34 pub addr: SocketAddr,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
40pub enum StorageBackendType {
41 RocksDB,
43
44 File,
46
47 Memory,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ControllerConfig {
54 pub node_id: u64,
56
57 pub listen_addr: SocketAddr,
59
60 pub raft_peers: Vec<RaftPeer>,
62
63 pub storage_path: PathBuf,
65
66 pub storage_backend: StorageBackendType,
68
69 pub election_timeout_ms: u64,
71
72 pub heartbeat_interval_ms: u64,
74}
75
76impl ControllerConfig {
77 pub fn new(node_id: u64, listen_addr: SocketAddr) -> Self {
79 Self {
80 node_id,
81 listen_addr,
82 raft_peers: Vec::new(),
83 storage_path: PathBuf::from("/tmp/rocketmq-controller"),
84 storage_backend: StorageBackendType::RocksDB,
85 election_timeout_ms: 1000,
86 heartbeat_interval_ms: 300,
87 }
88 }
89
90 pub fn with_raft_peers(mut self, peers: Vec<RaftPeer>) -> Self {
92 self.raft_peers = peers;
93 self
94 }
95
96 pub fn with_storage_path(mut self, path: PathBuf) -> Self {
98 self.storage_path = path;
99 self
100 }
101
102 pub fn with_storage_backend(mut self, backend: StorageBackendType) -> Self {
104 self.storage_backend = backend;
105 self
106 }
107
108 pub fn with_election_timeout_ms(mut self, timeout_ms: u64) -> Self {
110 self.election_timeout_ms = timeout_ms;
111 self
112 }
113
114 pub fn with_heartbeat_interval_ms(mut self, interval_ms: u64) -> Self {
116 self.heartbeat_interval_ms = interval_ms;
117 self
118 }
119
120 pub fn validate(&self) -> Result<()> {
122 if self.node_id == 0 {
123 return Err(ControllerError::ConfigError(
124 "Node ID cannot be 0".to_string(),
125 ));
126 }
127
128 if self.election_timeout_ms == 0 {
129 return Err(ControllerError::ConfigError(
130 "Election timeout cannot be 0".to_string(),
131 ));
132 }
133
134 if self.heartbeat_interval_ms == 0 {
135 return Err(ControllerError::ConfigError(
136 "Heartbeat interval cannot be 0".to_string(),
137 ));
138 }
139
140 if self.heartbeat_interval_ms >= self.election_timeout_ms {
141 return Err(ControllerError::ConfigError(
142 "Heartbeat interval must be less than election timeout".to_string(),
143 ));
144 }
145
146 Ok(())
147 }
148
149 pub fn to_storage_config(&self) -> StorageConfig {
151 match self.storage_backend {
152 #[cfg(feature = "storage-rocksdb")]
153 StorageBackendType::RocksDB => StorageConfig::RocksDB {
154 path: self.storage_path.join("rocksdb"),
155 },
156
157 #[cfg(feature = "storage-file")]
158 StorageBackendType::File => StorageConfig::File {
159 path: self.storage_path.join("filedb"),
160 },
161
162 StorageBackendType::Memory => StorageConfig::Memory,
163
164 #[allow(unreachable_patterns)]
165 _ => StorageConfig::Memory,
166 }
167 }
168
169 #[cfg(test)]
171 pub fn test_config() -> Self {
172 Self {
173 node_id: 1,
174 listen_addr: "127.0.0.1:29876".parse().unwrap(),
175 raft_peers: vec![RaftPeer {
176 id: 1,
177 addr: "127.0.0.1:29876".parse().unwrap(),
178 }],
179 storage_path: std::path::PathBuf::from("/tmp/controller_test"),
180 storage_backend: StorageBackendType::Memory,
181 election_timeout_ms: 1000,
182 heartbeat_interval_ms: 300,
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_config_builder() {
193 let config = ControllerConfig::new(1, "127.0.0.1:9876".parse().unwrap())
194 .with_election_timeout_ms(2000)
195 .with_heartbeat_interval_ms(600);
196
197 assert_eq!(config.node_id, 1);
198 assert_eq!(config.election_timeout_ms, 2000);
199 assert_eq!(config.heartbeat_interval_ms, 600);
200 }
201
202 #[test]
203 fn test_config_validation() {
204 let config = ControllerConfig::new(1, "127.0.0.1:9876".parse().unwrap());
205 assert!(config.validate().is_ok());
206
207 let invalid_config = ControllerConfig::new(0, "127.0.0.1:9876".parse().unwrap());
208 assert!(invalid_config.validate().is_err());
209 }
210}