Skip to main content

elfo_network/
config.rs

1//! Configuration for the network actors.
2//!
3//! Note: all types here are exported only for documentation purposes
4//! and are not subject to stable guarantees. However, the config
5//! structure (usually encoded in TOML) follows stable guarantees.
6
7#[cfg(unix)]
8use std::path::PathBuf;
9use std::{str::FromStr, time::Duration};
10
11use derive_more::Display;
12use eyre::{bail, Result};
13use serde::{
14    de::{self, Deserializer},
15    Deserialize, Serialize,
16};
17
18/// The network actors' config.
19///
20/// # Examples
21/// ```toml
22/// [system.network]
23/// listen = ["tcp://0.0.0.1:8150"]
24/// discovery.predefined = [
25///     "tcp://localhost:4242",
26///     "uds:///tmp/sock"
27/// ]
28/// ```
29#[derive(Debug, Clone, Deserialize)]
30pub struct Config {
31    /// A list of addresses to listen on.
32    ///
33    /// If changed, all existing connections related to removed transports
34    /// are closed immediately. New listeners are created for added
35    /// transports.
36    #[serde(default)]
37    pub listen: Vec<Transport>,
38    /// How to discover other nodes.
39    #[serde(default)]
40    pub discovery: DiscoveryConfig, // TODO: optional?
41    /// Compression settings.
42    #[serde(default)]
43    pub compression: CompressionConfig,
44    /// How often nodes should ping each other.
45    ///
46    /// Pings are used to measure RTT and detect dead connections.
47    /// For the latest purpose, see `idle_timeout`.
48    ///
49    /// `5s` by default.
50    #[serde(with = "humantime_serde", default = "default_ping_interval")]
51    pub ping_interval: Duration,
52    /// The maximum inactivity time of every connection.
53    ///
54    /// If no data is received on a connection for over `idle_timeout` time,
55    /// the connection is considered dead and will be automatically closed.
56    ///
57    /// This timeout is checked every `ping_interval` time, so the actual time
58    /// lies in the range of `idle_timeout` to `idle_timeout + ping_interval`.
59    ///
60    /// `30s` by default.
61    #[serde(with = "humantime_serde", default = "default_idle_timeout")]
62    pub idle_timeout: Duration,
63    /// Enables transport-specific metrics reporting.
64    ///
65    /// Note: Only Linux/TCP is currently supported. See [`libc::tcp_info`].
66    ///
67    /// `false` by default.
68    #[serde(default = "default_transport_specific_metrics")]
69    pub transport_specific_metrics: bool,
70}
71
72fn default_ping_interval() -> Duration {
73    Duration::from_secs(5)
74}
75
76fn default_idle_timeout() -> Duration {
77    Duration::from_secs(30)
78}
79
80fn default_transport_specific_metrics() -> bool {
81    false
82}
83
84/// How to discover other nodes.
85#[derive(Debug, Deserialize, Clone)]
86#[serde(default)]
87pub struct DiscoveryConfig {
88    /// Predefined list of transports to connect to.
89    ///
90    /// If changed, all existing connections related to removed transports
91    /// are closed immediately. New connections are created for added
92    /// transports.
93    pub predefined: Vec<Transport>,
94    /// How often to attempt to connect to other nodes.
95    ///
96    /// `10s` by default.
97    #[serde(with = "humantime_serde")]
98    pub attempt_interval: Duration,
99}
100
101impl Default for DiscoveryConfig {
102    fn default() -> Self {
103        Self {
104            predefined: Vec::new(),
105            attempt_interval: Duration::from_secs(10),
106        }
107    }
108}
109
110/// Transport used for communication between nodes.
111#[derive(Debug, Clone, Hash, PartialEq, Eq, Display, Serialize)]
112pub enum Transport {
113    /// TCP transport ("tcp://host:port").
114    #[display("tcp://{_0}")]
115    Tcp(String),
116    /// Unix domain socket transport ("uds://path/to/socket").
117    ///
118    /// Used only on UNIX systems, ignored on other platforms.
119    #[cfg(unix)]
120    #[display("uds://{}", _0.display())]
121    Uds(PathBuf),
122    /// Turmoil v0.6 transport ("turmoil06://host:port").
123    ///
124    /// A port can be omitted, in which case the default port is 57840 (0xE1F0).
125    ///
126    /// Useful for testing purposes only.
127    #[cfg(feature = "turmoil06")]
128    #[display("turmoil06://{_0}")]
129    Turmoil06(String),
130    /// Turmoil v0.7 transport ("turmoil07://host:port").
131    ///
132    /// A port can be omitted, in which case the default port is 57840 (0xE1F0).
133    ///
134    /// Useful for testing purposes only.
135    #[cfg(feature = "turmoil07")]
136    #[display("turmoil07://{_0}")]
137    Turmoil07(String),
138}
139
140impl FromStr for Transport {
141    type Err = eyre::Error;
142
143    fn from_str(s: &str) -> Result<Self> {
144        #[cfg(unix)]
145        const PROTOCOLS: &str = "tcp or uds";
146        #[cfg(not(unix))]
147        const PROTOCOLS: &str = "tcp";
148
149        let (protocol, addr) = s.split_once("://").unwrap_or_default();
150
151        match protocol {
152            "" => bail!("protocol must be specified ({PROTOCOLS})"),
153            "tcp" => Ok(Transport::Tcp(addr.into())),
154            #[cfg(unix)]
155            "uds" => {
156                eyre::ensure!(
157                    !addr.ends_with('/'),
158                    "path to UDS socket cannot be directory"
159                );
160                Ok(Transport::Uds(PathBuf::from(addr)))
161            }
162            #[cfg(feature = "turmoil06")]
163            "turmoil06" => Ok(Transport::Turmoil06(addr.into())),
164            #[cfg(feature = "turmoil07")]
165            "turmoil07" => Ok(Transport::Turmoil07(addr.into())),
166            proto => bail!("unknown protocol: {proto}"),
167        }
168    }
169}
170
171impl<'de> Deserialize<'de> for Transport {
172    fn deserialize<D>(deserializer: D) -> Result<Transport, D::Error>
173    where
174        D: Deserializer<'de>,
175    {
176        // FIXME: cannot use `&str` here: `expected borrowed string`.
177        let s: String = Deserialize::deserialize(deserializer)?;
178
179        s.parse::<Transport>()
180            .map_err(|err| de::Error::custom(format!(r#"unsupported transport: "{s}", {err}"#)))
181    }
182}
183
184/// Compression settings.
185///
186/// # Preference
187///
188/// By default, compression between nodes is supported but not enabled.
189/// If a node is configured with `Preferred` for some compression algorithm,
190/// all connections with this node will use this algorithm, until the other side
191/// explicitly disables it.
192///
193/// See [`Preference`] for more details.
194///
195/// # Example
196///
197/// ```toml
198/// [system.network]
199/// compression.lz4 = "Preferred"
200/// ```
201#[derive(Debug, Default, Deserialize, Clone)]
202pub struct CompressionConfig {
203    /// LZ4 compression algorithm.
204    #[serde(default)]
205    pub lz4: Preference,
206}
207
208/// Preference in a capability.
209///
210/// The following rules apply during handshake between nodes:
211/// * `Preferred` + `Preferred`/`Supported` leads to using the capability;
212/// * `Supported` + `Supported` leads to not using the capability;
213/// * `Disabled` + any leads to not using the capability.
214#[derive(Debug, Clone, Copy, Default, Deserialize)]
215pub enum Preference {
216    /// This is preferred, implies `Supported`.
217    Preferred,
218
219    /// This is just supported.
220    #[default]
221    Supported,
222
223    /// Must not be used.
224    Disabled,
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn transport_parsing() {
233        // Missing protocol
234        assert!(Transport::from_str("")
235            .unwrap_err()
236            .to_string()
237            .starts_with("protocol must be specified"));
238        assert!(Transport::from_str("://a/b")
239            .unwrap_err()
240            .to_string()
241            .starts_with("protocol must be specified"));
242
243        // Unknown protocol
244        assert!(Transport::from_str("foo://a")
245            .unwrap_err()
246            .to_string()
247            .starts_with("unknown protocol"));
248        #[cfg(not(unix))]
249        assert!(Transport::from_str("uds://a")
250            .unwrap_err()
251            .to_string()
252            .starts_with("unknown protocol"));
253
254        // TCP
255        assert_eq!(
256            Transport::from_str("tcp://127.0.0.1:4242").unwrap(),
257            Transport::Tcp("127.0.0.1:4242".into())
258        );
259        assert_eq!(
260            Transport::from_str("tcp://alice:4242").unwrap(),
261            Transport::Tcp("alice:4242".into())
262        );
263
264        // UDS
265        #[cfg(unix)]
266        {
267            assert_eq!(
268                Transport::from_str("uds:///a/b").unwrap(),
269                Transport::Uds("/a/b".into())
270            );
271            assert_eq!(
272                Transport::from_str("uds://rel/a/b").unwrap(),
273                Transport::Uds("rel/a/b".into())
274            );
275            assert_eq!(
276                Transport::from_str("uds:///a/").unwrap_err().to_string(),
277                "path to UDS socket cannot be directory"
278            );
279        }
280
281        // Turmoil06
282        #[cfg(feature = "turmoil06")]
283        assert_eq!(
284            Transport::from_str("turmoil06://alice").unwrap(),
285            Transport::Turmoil06("alice".into())
286        );
287
288        // Turmoil07
289        #[cfg(feature = "turmoil07")]
290        assert_eq!(
291            Transport::from_str("turmoil07://alice").unwrap(),
292            Transport::Turmoil07("alice".into())
293        );
294    }
295
296    #[test]
297    fn transport_display() {
298        #[cfg(unix)]
299        assert_eq!(
300            Transport::Uds(PathBuf::from("/some/path")).to_string(),
301            "uds:///some/path"
302        );
303    }
304}