Skip to main content

rmqtt_conf/
lib.rs

1#![deny(unsafe_code)]
2
3use std::fmt;
4use std::net::SocketAddr;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use config::{Config, File, Source};
11use once_cell::sync::OnceCell;
12use serde::Deserialize;
13
14use rmqtt_net::Result;
15use rmqtt_utils::*;
16
17use self::listener::Listeners;
18use self::logging::Log;
19
20pub use self::listener::Listener;
21pub use self::options::Options;
22
23pub mod listener;
24pub mod logging;
25pub mod options;
26
27static SETTINGS: OnceCell<Settings> = OnceCell::new();
28
29#[derive(Clone)]
30pub struct Settings(Arc<Inner>);
31
32#[derive(Debug, Clone, Deserialize)]
33pub struct Inner {
34    #[serde(default)]
35    pub task: Task,
36    #[serde(default)]
37    pub node: Node,
38    #[serde(default)]
39    pub rpc: Rpc,
40    #[serde(default)]
41    pub log: Log,
42    #[serde(rename = "listener")]
43    #[serde(default)]
44    pub listeners: Listeners,
45    #[serde(default)]
46    pub plugins: Plugins,
47    #[serde(default)]
48    pub mqtt: Mqtt,
49    #[serde(default, skip)]
50    pub opts: Options,
51}
52
53impl Deref for Settings {
54    type Target = Inner;
55    fn deref(&self) -> &Self::Target {
56        self.0.as_ref()
57    }
58}
59
60impl Settings {
61    fn new(opts: Options) -> Result<Self> {
62        let mut builder = Config::builder()
63            .add_source(File::with_name("/etc/rmqtt/rmqtt").required(false))
64            .add_source(File::with_name("/etc/rmqtt").required(false))
65            .add_source(File::with_name("rmqtt").required(false))
66            .add_source(
67                config::Environment::with_prefix("rmqtt")
68                    .try_parsing(true)
69                    .list_separator(" ")
70                    .with_list_parse_key("plugins.default_startups"),
71            );
72
73        if let Some(cfg) = opts.cfg_name.as_ref() {
74            builder = builder.add_source(File::with_name(cfg).required(false));
75        }
76
77        let mut inner: Inner = builder.build()?.try_deserialize()?;
78
79        inner.listeners.init();
80        if inner.listeners.tcps.is_empty() && inner.listeners.tlss.is_empty() {
81            //set default
82            inner.listeners.set_default();
83        }
84
85        //Command line configuration overriding file configuration
86        if let Some(id) = opts.node_id {
87            if id > 0 {
88                inner.node.id = id;
89            }
90        }
91        if let Some(plugins_default_startups) = opts.plugins_default_startups.as_ref() {
92            inner.plugins.default_startups.clone_from(plugins_default_startups)
93        }
94
95        inner.opts = opts;
96        Ok(Self(Arc::new(inner)))
97    }
98
99    #[inline]
100    pub fn instance() -> &'static Self {
101        match SETTINGS.get() {
102            Some(c) => c,
103            None => {
104                unreachable!("Settings not initialized");
105            }
106        }
107    }
108
109    #[inline]
110    pub fn init(opts: Options) -> Result<&'static Self> {
111        SETTINGS.set(Settings::new(opts)?).map_err(|_| anyhow!("Settings init failed"))?;
112        SETTINGS.get().ok_or_else(|| anyhow!("Settings init failed"))
113    }
114
115    #[inline]
116    pub fn logs() -> Result<()> {
117        let cfg = Self::instance();
118        log::debug!("Config info is {:?}", cfg.0);
119        log::info!("node_id is {}", cfg.node.id);
120        log::info!("exec_workers is {}", cfg.task.exec_workers);
121        log::info!("exec_queue_max is {}", cfg.task.exec_queue_max);
122        log::info!("node.busy config is: {:?}", cfg.node.busy);
123        log::info!("node.rpc config is: {:?}", cfg.rpc);
124
125        if cfg.opts.node_grpc_addrs.is_some() {
126            log::info!("node_grpc_addrs is {:?}", cfg.opts.node_grpc_addrs);
127        }
128        if cfg.opts.raft_peer_addrs.is_some() {
129            log::info!("raft_peer_addrs is {:?}", cfg.opts.raft_peer_addrs);
130        }
131        if cfg.opts.raft_leader_id.is_some() {
132            log::info!("raft_leader_id is {:?}", cfg.opts.raft_leader_id);
133        }
134        Ok(())
135    }
136}
137
138impl fmt::Debug for Settings {
139    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
140        write!(f, "Settings ...")?;
141        Ok(())
142    }
143}
144
145#[derive(Debug, Clone, Deserialize)]
146pub struct Task {
147    //Concurrent task count for global task executor.
148    #[serde(default = "Task::exec_workers_default")]
149    pub exec_workers: usize,
150
151    //Queue capacity for global task executor.
152    #[serde(default = "Task::exec_queue_max_default")]
153    pub exec_queue_max: usize,
154}
155
156impl Default for Task {
157    #[inline]
158    fn default() -> Self {
159        Self { exec_workers: Self::exec_workers_default(), exec_queue_max: Self::exec_queue_max_default() }
160    }
161}
162
163impl Task {
164    fn exec_workers_default() -> usize {
165        1000
166    }
167    fn exec_queue_max_default() -> usize {
168        300_000
169    }
170}
171
172#[derive(Default, Debug, Clone, Deserialize)]
173pub struct Node {
174    #[serde(default)]
175    pub id: NodeId,
176    #[serde(default = "Node::cookie_default")]
177    pub cookie: String,
178    // #[serde(default = "Node::crash_dump_default")]
179    // pub crash_dump: String,
180    #[serde(default)]
181    pub busy: Busy,
182}
183
184impl Node {
185    fn cookie_default() -> String {
186        "rmqttsecretcookie".into()
187    }
188    // fn crash_dump_default() -> String {
189    //     "/var/log/rmqtt/crash.dump".into()
190    // }
191}
192
193#[derive(Debug, Clone, Deserialize)]
194pub struct Busy {
195    //Busy status check switch
196    #[serde(default = "Busy::check_enable_default")]
197    pub check_enable: bool,
198    //Busy status update interval
199    #[serde(default = "Busy::update_interval_default", deserialize_with = "deserialize_duration")]
200    pub update_interval: Duration,
201    //The threshold for the 1-minute average system load used to determine system busyness.
202    #[serde(default = "Busy::loadavg_default")]
203    pub loadavg: f32, //70.0
204    //The threshold for average CPU load used to determine system busyness.
205    #[serde(default = "Busy::cpuloadavg_default")]
206    pub cpuloadavg: f32, //80.0
207    //The threshold for determining high-concurrency connection handshakes in progress.
208    #[serde(default)]
209    pub handshaking: isize, //0
210}
211
212impl Default for Busy {
213    #[inline]
214    fn default() -> Self {
215        Self {
216            check_enable: Self::check_enable_default(),
217            update_interval: Self::update_interval_default(),
218            loadavg: Self::loadavg_default(),
219            cpuloadavg: Self::cpuloadavg_default(),
220            handshaking: 0,
221        }
222    }
223}
224
225impl Busy {
226    fn check_enable_default() -> bool {
227        true
228    }
229    fn update_interval_default() -> Duration {
230        Duration::from_secs(2)
231    }
232    fn loadavg_default() -> f32 {
233        80.0
234    }
235
236    fn cpuloadavg_default() -> f32 {
237        90.0
238    }
239}
240
241#[derive(Debug, Clone, Deserialize)]
242pub struct Rpc {
243    #[serde(default = "Rpc::server_addr_default", deserialize_with = "deserialize_addr")]
244    pub server_addr: SocketAddr,
245
246    #[serde(default = "Rpc::reuseaddr_default")]
247    pub reuseaddr: bool,
248
249    #[serde(default = "Rpc::reuseport_default")]
250    pub reuseport: bool,
251}
252
253impl Default for Rpc {
254    #[inline]
255    fn default() -> Self {
256        Self {
257            reuseaddr: Self::reuseaddr_default(),
258            reuseport: Self::reuseport_default(),
259            server_addr: Self::server_addr_default(),
260        }
261    }
262}
263
264impl Rpc {
265    fn reuseaddr_default() -> bool {
266        true
267    }
268    fn reuseport_default() -> bool {
269        false
270    }
271    fn server_addr_default() -> SocketAddr {
272        ([0, 0, 0, 0], 5363).into()
273    }
274}
275
276#[derive(Default, Debug, Clone, Deserialize)]
277pub struct Plugins {
278    #[serde(default = "Plugins::dir_default")]
279    pub dir: String,
280    #[serde(default)]
281    pub default_startups: Vec<String>,
282}
283
284impl Plugins {
285    fn dir_default() -> String {
286        "./plugins/".into()
287    }
288
289    pub fn load_config<'de, T: serde::Deserialize<'de>>(&self, name: &str) -> Result<T> {
290        let (cfg, _) = self.load_config_with_required(name, true, &[])?;
291        Ok(cfg)
292    }
293
294    pub fn load_config_default<'de, T: serde::Deserialize<'de>>(&self, name: &str) -> Result<T> {
295        let (cfg, def) = self.load_config_with_required(name, false, &[])?;
296        if def {
297            log::warn!("The configuration for plugin '{name}' does not exist, default values will be used!");
298        }
299        Ok(cfg)
300    }
301
302    pub fn load_config_with<'de, T: serde::Deserialize<'de>>(
303        &self,
304        name: &str,
305        env_list_keys: &[&str],
306    ) -> Result<T> {
307        let (cfg, _) = self.load_config_with_required(name, true, env_list_keys)?;
308        Ok(cfg)
309    }
310
311    pub fn load_config_default_with<'de, T: serde::Deserialize<'de>>(
312        &self,
313        name: &str,
314        env_list_keys: &[&str],
315    ) -> Result<T> {
316        let (cfg, def) = self.load_config_with_required(name, false, env_list_keys)?;
317        if def {
318            log::warn!("The configuration for plugin '{name}' does not exist, default values will be used!");
319        }
320        Ok(cfg)
321    }
322
323    fn load_config_with_required<'de, T: serde::Deserialize<'de>>(
324        &self,
325        name: &str,
326        required: bool,
327        env_list_keys: &[&str],
328    ) -> Result<(T, bool)> {
329        let dir = self.dir.trim_end_matches(['/', '\\']);
330        let mut builder =
331            Config::builder().add_source(File::with_name(&format!("{dir}/{name}")).required(required));
332
333        let mut env = config::Environment::with_prefix(&format!("rmqtt_plugin_{}", name.replace('-', "_")));
334        if !env_list_keys.is_empty() {
335            env = env.try_parsing(true).list_separator(" ");
336            for key in env_list_keys {
337                env = env.with_list_parse_key(key);
338            }
339        }
340        builder = builder.add_source(env);
341
342        let s = builder.build()?;
343        let count = s.collect()?.len();
344        Ok((s.try_deserialize::<T>()?, count == 0))
345    }
346}
347
348#[derive(Debug, Clone, Default, Deserialize)]
349pub struct Mqtt {
350    #[serde(default = "Mqtt::delayed_publish_max_default")]
351    pub delayed_publish_max: usize,
352    #[serde(default = "Mqtt::delayed_publish_immediate_default")]
353    pub delayed_publish_immediate: bool,
354    #[serde(default = "Mqtt::max_sessions_default")]
355    pub max_sessions: isize,
356}
357
358impl Mqtt {
359    fn delayed_publish_max_default() -> usize {
360        100_000
361    }
362
363    fn delayed_publish_immediate_default() -> bool {
364        true
365    }
366
367    fn max_sessions_default() -> isize {
368        0
369    }
370}