rocketmq_controller/
config.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::net::SocketAddr;
19use std::path::PathBuf;
20
21use serde::Deserialize;
22use serde::Serialize;
23
24use crate::error::ControllerError;
25use crate::error::Result;
26use crate::storage::StorageConfig;
27
28/// Raft peer configuration
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct RaftPeer {
31    /// Node ID
32    pub id: u64,
33
34    /// Peer address
35    pub addr: SocketAddr,
36}
37
38/// Storage backend type
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
40pub enum StorageBackendType {
41    /// RocksDB storage
42    RocksDB,
43
44    /// File-based storage
45    File,
46
47    /// In-memory storage (for testing)
48    Memory,
49}
50
51/// Controller configuration
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ControllerConfig {
54    /// Node ID
55    pub node_id: u64,
56
57    /// Listen address
58    pub listen_addr: SocketAddr,
59
60    /// Raft peer list
61    pub raft_peers: Vec<RaftPeer>,
62
63    /// Storage path
64    pub storage_path: PathBuf,
65
66    /// Storage backend type
67    pub storage_backend: StorageBackendType,
68
69    /// Election timeout in milliseconds
70    pub election_timeout_ms: u64,
71
72    /// Heartbeat interval in milliseconds
73    pub heartbeat_interval_ms: u64,
74}
75
76impl ControllerConfig {
77    /// Create a new controller configuration
78    pub fn new(node_id: u64, listen_addr: SocketAddr) -> Self {
79        Self {
80            node_id,
81            listen_addr,
82            raft_peers: Vec::new(),
83            storage_path: PathBuf::from("/tmp/rocketmq-controller"),
84            storage_backend: StorageBackendType::RocksDB,
85            election_timeout_ms: 1000,
86            heartbeat_interval_ms: 300,
87        }
88    }
89
90    /// Set Raft peers
91    pub fn with_raft_peers(mut self, peers: Vec<RaftPeer>) -> Self {
92        self.raft_peers = peers;
93        self
94    }
95
96    /// Set storage path
97    pub fn with_storage_path(mut self, path: PathBuf) -> Self {
98        self.storage_path = path;
99        self
100    }
101
102    /// Set storage backend
103    pub fn with_storage_backend(mut self, backend: StorageBackendType) -> Self {
104        self.storage_backend = backend;
105        self
106    }
107
108    /// Set election timeout
109    pub fn with_election_timeout_ms(mut self, timeout_ms: u64) -> Self {
110        self.election_timeout_ms = timeout_ms;
111        self
112    }
113
114    /// Set heartbeat interval
115    pub fn with_heartbeat_interval_ms(mut self, interval_ms: u64) -> Self {
116        self.heartbeat_interval_ms = interval_ms;
117        self
118    }
119
120    /// Validate the configuration
121    pub fn validate(&self) -> Result<()> {
122        if self.node_id == 0 {
123            return Err(ControllerError::ConfigError(
124                "Node ID cannot be 0".to_string(),
125            ));
126        }
127
128        if self.election_timeout_ms == 0 {
129            return Err(ControllerError::ConfigError(
130                "Election timeout cannot be 0".to_string(),
131            ));
132        }
133
134        if self.heartbeat_interval_ms == 0 {
135            return Err(ControllerError::ConfigError(
136                "Heartbeat interval cannot be 0".to_string(),
137            ));
138        }
139
140        if self.heartbeat_interval_ms >= self.election_timeout_ms {
141            return Err(ControllerError::ConfigError(
142                "Heartbeat interval must be less than election timeout".to_string(),
143            ));
144        }
145
146        Ok(())
147    }
148
149    /// Convert to storage configuration
150    pub fn to_storage_config(&self) -> StorageConfig {
151        match self.storage_backend {
152            #[cfg(feature = "storage-rocksdb")]
153            StorageBackendType::RocksDB => StorageConfig::RocksDB {
154                path: self.storage_path.join("rocksdb"),
155            },
156
157            #[cfg(feature = "storage-file")]
158            StorageBackendType::File => StorageConfig::File {
159                path: self.storage_path.join("filedb"),
160            },
161
162            StorageBackendType::Memory => StorageConfig::Memory,
163
164            #[allow(unreachable_patterns)]
165            _ => StorageConfig::Memory,
166        }
167    }
168
169    /// Create a test configuration (for testing only)
170    #[cfg(test)]
171    pub fn test_config() -> Self {
172        Self {
173            node_id: 1,
174            listen_addr: "127.0.0.1:29876".parse().unwrap(),
175            raft_peers: vec![RaftPeer {
176                id: 1,
177                addr: "127.0.0.1:29876".parse().unwrap(),
178            }],
179            storage_path: std::path::PathBuf::from("/tmp/controller_test"),
180            storage_backend: StorageBackendType::Memory,
181            election_timeout_ms: 1000,
182            heartbeat_interval_ms: 300,
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn test_config_builder() {
193        let config = ControllerConfig::new(1, "127.0.0.1:9876".parse().unwrap())
194            .with_election_timeout_ms(2000)
195            .with_heartbeat_interval_ms(600);
196
197        assert_eq!(config.node_id, 1);
198        assert_eq!(config.election_timeout_ms, 2000);
199        assert_eq!(config.heartbeat_interval_ms, 600);
200    }
201
202    #[test]
203    fn test_config_validation() {
204        let config = ControllerConfig::new(1, "127.0.0.1:9876".parse().unwrap());
205        assert!(config.validate().is_ok());
206
207        let invalid_config = ControllerConfig::new(0, "127.0.0.1:9876".parse().unwrap());
208        assert!(invalid_config.validate().is_err());
209    }
210}