1#![deny(unsafe_code)]
2
3use std::fmt;
4use std::net::SocketAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use config::{Config, File, Source};
11use once_cell::sync::OnceCell;
12use serde::Deserialize;
13
14use rmqtt_net::Result;
15use rmqtt_utils::*;
16
17use self::listener::Listeners;
18use self::logging::Log;
19
20pub use self::listener::Listener;
21pub use self::options::Options;
22
23pub mod listener;
24pub mod logging;
25pub mod options;
26
27static SETTINGS: OnceCell<Settings> = OnceCell::new();
28
29#[derive(Clone)]
30pub struct Settings(Arc<Inner>);
31
32#[derive(Debug, Clone, Deserialize)]
33pub struct Inner {
34 #[serde(default)]
35 pub task: Task,
36 #[serde(default)]
37 pub node: Node,
38 #[serde(default)]
39 pub rpc: Rpc,
40 #[serde(default)]
41 pub log: Log,
42 #[serde(rename = "listener")]
43 #[serde(default)]
44 pub listeners: Listeners,
45 #[serde(default)]
46 pub plugins: Plugins,
47 #[serde(default)]
48 pub mqtt: Mqtt,
49 #[serde(default, skip)]
50 pub opts: Options,
51}
52
53impl Deref for Settings {
54 type Target = Inner;
55 fn deref(&self) -> &Self::Target {
56 self.0.as_ref()
57 }
58}
59
60impl Settings {
61 fn new(opts: Options) -> Result<Self> {
62 let mut builder = Config::builder()
63 .add_source(File::with_name("/etc/rmqtt/rmqtt").required(false))
64 .add_source(File::with_name("/etc/rmqtt").required(false))
65 .add_source(File::with_name("rmqtt").required(false))
66 .add_source(
67 config::Environment::with_prefix("rmqtt")
68 .try_parsing(true)
69 .list_separator(" ")
70 .with_list_parse_key("plugins.default_startups"),
71 );
72
73 if let Some(cfg) = opts.cfg_name.as_ref() {
74 builder = builder.add_source(File::with_name(cfg).required(false));
75 }
76
77 let mut inner: Inner = builder.build()?.try_deserialize()?;
78
79 inner.listeners.init();
80 if inner.listeners.tcps.is_empty() && inner.listeners.tlss.is_empty() {
81 inner.listeners.set_default();
83 }
84
85 if let Some(id) = opts.node_id {
87 if id > 0 {
88 inner.node.id = id;
89 }
90 }
91 if let Some(plugins_default_startups) = opts.plugins_default_startups.as_ref() {
92 inner.plugins.default_startups.clone_from(plugins_default_startups)
93 }
94
95 inner.opts = opts;
96 Ok(Self(Arc::new(inner)))
97 }
98
99 #[inline]
100 pub fn instance() -> &'static Self {
101 match SETTINGS.get() {
102 Some(c) => c,
103 None => {
104 unreachable!("Settings not initialized");
105 }
106 }
107 }
108
109 #[inline]
110 pub fn init(opts: Options) -> Result<&'static Self> {
111 SETTINGS.set(Settings::new(opts)?).map_err(|_| anyhow!("Settings init failed"))?;
112 SETTINGS.get().ok_or_else(|| anyhow!("Settings init failed"))
113 }
114
115 #[inline]
116 pub fn logs() -> Result<()> {
117 let cfg = Self::instance();
118 log::debug!("Config info is {:?}", cfg.0);
119 log::info!("node_id is {}", cfg.node.id);
120 log::info!("exec_workers is {}", cfg.task.exec_workers);
121 log::info!("exec_queue_max is {}", cfg.task.exec_queue_max);
122 log::info!("node.busy config is: {:?}", cfg.node.busy);
123 log::info!("node.rpc config is: {:?}", cfg.rpc);
124
125 if cfg.opts.node_grpc_addrs.is_some() {
126 log::info!("node_grpc_addrs is {:?}", cfg.opts.node_grpc_addrs);
127 }
128 if cfg.opts.raft_peer_addrs.is_some() {
129 log::info!("raft_peer_addrs is {:?}", cfg.opts.raft_peer_addrs);
130 }
131 if cfg.opts.raft_leader_id.is_some() {
132 log::info!("raft_leader_id is {:?}", cfg.opts.raft_leader_id);
133 }
134 Ok(())
135 }
136}
137
138impl fmt::Debug for Settings {
139 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
140 write!(f, "Settings ...")?;
141 Ok(())
142 }
143}
144
145#[derive(Debug, Clone, Deserialize)]
146pub struct Task {
147 #[serde(default = "Task::exec_workers_default")]
149 pub exec_workers: usize,
150
151 #[serde(default = "Task::exec_queue_max_default")]
153 pub exec_queue_max: usize,
154}
155
156impl Default for Task {
157 #[inline]
158 fn default() -> Self {
159 Self { exec_workers: Self::exec_workers_default(), exec_queue_max: Self::exec_queue_max_default() }
160 }
161}
162
163impl Task {
164 fn exec_workers_default() -> usize {
165 1000
166 }
167 fn exec_queue_max_default() -> usize {
168 300_000
169 }
170}
171
172#[derive(Default, Debug, Clone, Deserialize)]
173pub struct Node {
174 #[serde(default)]
175 pub id: NodeId,
176 #[serde(default = "Node::cookie_default")]
177 pub cookie: String,
178 #[serde(default)]
181 pub busy: Busy,
182}
183
184impl Node {
185 fn cookie_default() -> String {
186 "rmqttsecretcookie".into()
187 }
188 }
192
193#[derive(Debug, Clone, Deserialize)]
194pub struct Busy {
195 #[serde(default = "Busy::check_enable_default")]
197 pub check_enable: bool,
198 #[serde(default = "Busy::update_interval_default", deserialize_with = "deserialize_duration")]
200 pub update_interval: Duration,
201 #[serde(default = "Busy::loadavg_default")]
203 pub loadavg: f32, #[serde(default = "Busy::cpuloadavg_default")]
206 pub cpuloadavg: f32, #[serde(default)]
209 pub handshaking: isize, }
211
212impl Default for Busy {
213 #[inline]
214 fn default() -> Self {
215 Self {
216 check_enable: Self::check_enable_default(),
217 update_interval: Self::update_interval_default(),
218 loadavg: Self::loadavg_default(),
219 cpuloadavg: Self::cpuloadavg_default(),
220 handshaking: 0,
221 }
222 }
223}
224
225impl Busy {
226 fn check_enable_default() -> bool {
227 true
228 }
229 fn update_interval_default() -> Duration {
230 Duration::from_secs(2)
231 }
232 fn loadavg_default() -> f32 {
233 80.0
234 }
235
236 fn cpuloadavg_default() -> f32 {
237 90.0
238 }
239}
240
241#[derive(Debug, Clone, Deserialize)]
242pub struct Rpc {
243 #[serde(default = "Rpc::server_addr_default", deserialize_with = "deserialize_addr")]
244 pub server_addr: SocketAddr,
245
246 #[serde(default = "Rpc::reuseaddr_default")]
247 pub reuseaddr: bool,
248
249 #[serde(default = "Rpc::reuseport_default")]
250 pub reuseport: bool,
251}
252
253impl Default for Rpc {
254 #[inline]
255 fn default() -> Self {
256 Self {
257 reuseaddr: Self::reuseaddr_default(),
258 reuseport: Self::reuseport_default(),
259 server_addr: Self::server_addr_default(),
260 }
261 }
262}
263
264impl Rpc {
265 fn reuseaddr_default() -> bool {
266 true
267 }
268 fn reuseport_default() -> bool {
269 false
270 }
271 fn server_addr_default() -> SocketAddr {
272 ([0, 0, 0, 0], 5363).into()
273 }
274}
275
276#[derive(Default, Debug, Clone, Deserialize)]
277pub struct Plugins {
278 #[serde(default = "Plugins::dir_default")]
279 pub dir: String,
280 #[serde(default)]
281 pub default_startups: Vec<String>,
282}
283
284impl Plugins {
285 fn dir_default() -> String {
286 "./plugins/".into()
287 }
288
289 pub fn load_config<'de, T: serde::Deserialize<'de>>(&self, name: &str) -> Result<T> {
290 let (cfg, _) = self.load_config_with_required(name, true, &[])?;
291 Ok(cfg)
292 }
293
294 pub fn load_config_default<'de, T: serde::Deserialize<'de>>(&self, name: &str) -> Result<T> {
295 let (cfg, def) = self.load_config_with_required(name, false, &[])?;
296 if def {
297 log::warn!("The configuration for plugin '{name}' does not exist, default values will be used!");
298 }
299 Ok(cfg)
300 }
301
302 pub fn load_config_with<'de, T: serde::Deserialize<'de>>(
303 &self,
304 name: &str,
305 env_list_keys: &[&str],
306 ) -> Result<T> {
307 let (cfg, _) = self.load_config_with_required(name, true, env_list_keys)?;
308 Ok(cfg)
309 }
310
311 pub fn load_config_default_with<'de, T: serde::Deserialize<'de>>(
312 &self,
313 name: &str,
314 env_list_keys: &[&str],
315 ) -> Result<T> {
316 let (cfg, def) = self.load_config_with_required(name, false, env_list_keys)?;
317 if def {
318 log::warn!("The configuration for plugin '{name}' does not exist, default values will be used!");
319 }
320 Ok(cfg)
321 }
322
323 fn load_config_with_required<'de, T: serde::Deserialize<'de>>(
324 &self,
325 name: &str,
326 required: bool,
327 env_list_keys: &[&str],
328 ) -> Result<(T, bool)> {
329 let dir = self.dir.trim_end_matches(['/', '\\']);
330 let mut builder =
331 Config::builder().add_source(File::with_name(&format!("{dir}/{name}")).required(required));
332
333 let mut env = config::Environment::with_prefix(&format!("rmqtt_plugin_{}", name.replace('-', "_")));
334 if !env_list_keys.is_empty() {
335 env = env.try_parsing(true).list_separator(" ");
336 for key in env_list_keys {
337 env = env.with_list_parse_key(key);
338 }
339 }
340 builder = builder.add_source(env);
341
342 let s = builder.build()?;
343 let count = s.collect()?.len();
344 Ok((s.try_deserialize::<T>()?, count == 0))
345 }
346}
347
348#[derive(Debug, Clone, Default, Deserialize)]
349pub struct Mqtt {
350 #[serde(default = "Mqtt::delayed_publish_max_default")]
351 pub delayed_publish_max: usize,
352 #[serde(default = "Mqtt::delayed_publish_immediate_default")]
353 pub delayed_publish_immediate: bool,
354 #[serde(default = "Mqtt::max_sessions_default")]
355 pub max_sessions: isize,
356}
357
358impl Mqtt {
359 fn delayed_publish_max_default() -> usize {
360 100_000
361 }
362
363 fn delayed_publish_immediate_default() -> bool {
364 true
365 }
366
367 fn max_sessions_default() -> isize {
368 0
369 }
370}