Skip to main content

zenoh_config/
lib.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
20//!
21//! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants.
22#![allow(deprecated)]
23
24pub mod defaults;
25mod include;
26pub mod qos;
27pub mod wrappers;
28
29#[allow(unused_imports)]
30use std::convert::TryFrom;
31// This is a false positive from the rust analyser
32use std::{
33    any::Any,
34    collections::HashSet,
35    fmt,
36    io::Read,
37    net::SocketAddr,
38    num::{NonZeroU16, NonZeroUsize},
39    ops::{self, Bound, Deref, RangeBounds},
40    path::Path,
41    sync::{Arc, Weak},
42};
43
44use include::recursive_include;
45use nonempty_collections::NEVec;
46use qos::{PublisherQoSConfList, QosFilter, QosOverwriteMessage, QosOverwrites};
47use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
48use serde::{Deserialize, Serialize};
49use serde_json::{Map, Value};
50use validated_struct::ValidatedMapAssociatedTypes;
51pub use validated_struct::{GetError, ValidatedMap};
52pub use wrappers::ZenohId;
53pub use zenoh_protocol::core::{
54    whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor,
55};
56use zenoh_protocol::{
57    core::{
58        key_expr::{OwnedKeyExpr, OwnedNonWildKeyExpr},
59        Bits,
60    },
61    transport::{BatchSize, TransportSn},
62};
63use zenoh_result::{bail, zerror, ZResult};
64use zenoh_util::{LibLoader, LibSearchDirs};
65
66pub mod mode_dependent;
67pub use mode_dependent::*;
68
69pub mod connection_retry;
70pub use connection_retry::*;
71
72// Wrappers for secrecy of values
73#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
74pub struct SecretString(String);
75
76impl ops::Deref for SecretString {
77    type Target = String;
78
79    fn deref(&self) -> &Self::Target {
80        &self.0
81    }
82}
83
84impl SerializableSecret for SecretString {}
85impl DebugSecret for SecretString {}
86impl CloneableSecret for SecretString {}
87impl Zeroize for SecretString {
88    fn zeroize(&mut self) {
89        self.0 = "".to_string();
90    }
91}
92
93pub type SecretValue = Secret<SecretString>;
94
95#[derive(Debug, Deserialize, Serialize, Clone)]
96pub struct TransportWeight {
97    /// A zid of destination node.
98    pub dst_zid: ZenohId,
99    /// A weight of link from this node to the destination.
100    pub weight: NonZeroU16,
101}
102
103#[derive(Debug, Deserialize, Serialize, Clone, Copy, Eq, PartialEq)]
104#[serde(rename_all = "snake_case")]
105pub enum InterceptorFlow {
106    Egress,
107    Ingress,
108}
109
110#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
111#[serde(rename_all = "snake_case")]
112pub enum DownsamplingMessage {
113    Delete,
114    #[deprecated = "Use `Put` or `Delete` instead."]
115    Push,
116    Put,
117    Query,
118    Reply,
119}
120
121#[derive(Debug, Deserialize, Serialize, Clone)]
122#[serde(deny_unknown_fields)]
123pub struct DownsamplingRuleConf {
124    /// A list of key-expressions to which the downsampling will be applied.
125    /// Downsampling will be applied for all key extensions if the parameter is None
126    pub key_expr: OwnedKeyExpr,
127    /// The maximum frequency in Hertz;
128    pub freq: f64,
129}
130
131#[derive(Debug, Deserialize, Serialize, Clone)]
132#[serde(deny_unknown_fields)]
133pub struct DownsamplingItemConf {
134    /// Optional identifier for the downsampling configuration item
135    pub id: Option<String>,
136    /// A list of interfaces to which the downsampling will be applied
137    /// Downsampling will be applied for all interfaces if the parameter is None
138    pub interfaces: Option<NEVec<String>>,
139    /// A list of link types, transports having one of those link types will have the downsampling applied
140    /// Downsampling will be applied for all link types if the parameter is None
141    pub link_protocols: Option<NEVec<InterceptorLink>>,
142    // list of message types on which the downsampling will be applied
143    pub messages: NEVec<DownsamplingMessage>,
144    /// A list of downsampling rules: key_expression and the maximum frequency in Hertz
145    pub rules: NEVec<DownsamplingRuleConf>,
146    /// Downsampling flow directions: egress and/or ingress
147    pub flows: Option<NEVec<InterceptorFlow>>,
148}
149
150fn downsampling_validator(d: &Vec<DownsamplingItemConf>) -> bool {
151    for item in d {
152        if item
153            .messages
154            .iter()
155            .any(|m| *m == DownsamplingMessage::Push)
156        {
157            tracing::warn!("In 'downsampling/messages' configuration: 'push' is deprecated and may not be supported in future versions, use 'put' and/or 'delete' instead");
158        }
159    }
160    true
161}
162
163#[derive(Serialize, Debug, Deserialize, Clone)]
164#[serde(deny_unknown_fields)]
165pub struct LowPassFilterConf {
166    pub id: Option<String>,
167    pub interfaces: Option<NEVec<String>>,
168    pub link_protocols: Option<NEVec<InterceptorLink>>,
169    pub flows: Option<NEVec<InterceptorFlow>>,
170    pub messages: NEVec<LowPassFilterMessage>,
171    pub key_exprs: NEVec<OwnedKeyExpr>,
172    pub size_limit: usize,
173}
174
175#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
176#[serde(rename_all = "snake_case")]
177pub enum LowPassFilterMessage {
178    Put,
179    Delete,
180    Query,
181    Reply,
182}
183
184#[derive(Serialize, Debug, Deserialize, Clone)]
185#[serde(deny_unknown_fields)]
186pub struct AclConfigRule {
187    pub id: String,
188    pub key_exprs: NEVec<OwnedKeyExpr>,
189    pub messages: NEVec<AclMessage>,
190    pub flows: Option<NEVec<InterceptorFlow>>,
191    pub permission: Permission,
192}
193
194#[derive(Serialize, Debug, Deserialize, Clone)]
195#[serde(deny_unknown_fields)]
196pub struct AclConfigSubjects {
197    pub id: String,
198    pub interfaces: Option<NEVec<Interface>>,
199    pub cert_common_names: Option<NEVec<CertCommonName>>,
200    pub usernames: Option<NEVec<Username>>,
201    pub link_protocols: Option<NEVec<InterceptorLink>>,
202    pub zids: Option<NEVec<ZenohId>>,
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct ConfRange {
207    start: Option<u64>,
208    end: Option<u64>,
209}
210
211impl ConfRange {
212    pub fn new(start: Option<u64>, end: Option<u64>) -> Self {
213        Self { start, end }
214    }
215}
216
217impl RangeBounds<u64> for ConfRange {
218    fn start_bound(&self) -> Bound<&u64> {
219        match self.start {
220            Some(ref start) => Bound::Included(start),
221            None => Bound::Unbounded,
222        }
223    }
224    fn end_bound(&self) -> Bound<&u64> {
225        match self.end {
226            Some(ref end) => Bound::Included(end),
227            None => Bound::Unbounded,
228        }
229    }
230}
231
232impl serde::Serialize for ConfRange {
233    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
234    where
235        S: serde::Serializer,
236    {
237        serializer.serialize_str(&format!(
238            "{}..{}",
239            self.start.unwrap_or_default(),
240            self.end.unwrap_or_default()
241        ))
242    }
243}
244
245impl<'a> serde::Deserialize<'a> for ConfRange {
246    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
247    where
248        D: serde::Deserializer<'a>,
249    {
250        struct V;
251
252        impl serde::de::Visitor<'_> for V {
253            type Value = ConfRange;
254
255            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
256                formatter.write_str("range string")
257            }
258
259            fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
260            where
261                E: serde::de::Error,
262            {
263                let (start, end) = v
264                    .split_once("..")
265                    .ok_or_else(|| serde::de::Error::custom("invalid range"))?;
266                let parse_bound = |bound: &str| {
267                    (!bound.is_empty())
268                        .then(|| bound.parse::<u64>())
269                        .transpose()
270                        .map_err(|_| serde::de::Error::custom("invalid range bound"))
271                };
272                Ok(ConfRange::new(parse_bound(start)?, parse_bound(end)?))
273            }
274        }
275        deserializer.deserialize_str(V)
276    }
277}
278
279#[derive(Debug, Deserialize, Serialize, Clone)]
280#[serde(deny_unknown_fields)]
281pub struct QosOverwriteItemConf {
282    /// Optional identifier for the qos modification configuration item.
283    pub id: Option<String>,
284    /// A list of ZIDs on which qos will be overwritten when communicating with.
285    pub zids: Option<NEVec<ZenohId>>,
286    /// A list of interfaces to which the qos will be applied.
287    /// QosOverwrite will be applied for all interfaces if the parameter is None.
288    pub interfaces: Option<NEVec<String>>,
289    /// A list of link types, transports having one of those link types will have the qos overwrite applied
290    /// Qos overwrite will be applied for all link types if the parameter is None.
291    pub link_protocols: Option<NEVec<InterceptorLink>>,
292    /// List of message types on which the qos overwrite will be applied.
293    pub messages: NEVec<QosOverwriteMessage>,
294    /// List of key expressions to apply qos overwrite.
295    pub key_exprs: Option<NEVec<OwnedKeyExpr>>,
296    // The qos value to overwrite with.
297    pub overwrite: QosOverwrites,
298    /// QosOverwrite flow directions: egress and/or ingress.
299    pub flows: Option<NEVec<InterceptorFlow>>,
300    /// QoS filter to apply to the messages matching this item.
301    pub qos: Option<QosFilter>,
302    /// payload_size range for the messages matching this item.
303    pub payload_size: Option<ConfRange>,
304}
305
306#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
307pub struct Interface(pub String);
308
309impl std::fmt::Display for Interface {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        write!(f, "Interface({})", self.0)
312    }
313}
314
315#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
316pub struct CertCommonName(pub String);
317
318impl std::fmt::Display for CertCommonName {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        write!(f, "CertCommonName({})", self.0)
321    }
322}
323
324#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
325pub struct Username(pub String);
326
327impl std::fmt::Display for Username {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        write!(f, "Username({})", self.0)
330    }
331}
332
333#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
334#[serde(rename_all = "kebab-case")]
335pub enum InterceptorLink {
336    Tcp,
337    Udp,
338    Tls,
339    Quic,
340    Serial,
341    Unixpipe,
342    UnixsockStream,
343    Vsock,
344    Ws,
345}
346
347impl std::fmt::Display for InterceptorLink {
348    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
349        write!(f, "Transport({self:?})")
350    }
351}
352
353#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
354#[serde(deny_unknown_fields)]
355pub struct AclConfigPolicyEntry {
356    pub id: Option<String>,
357    pub rules: Vec<String>,
358    pub subjects: Vec<String>,
359}
360
361#[derive(Clone, Serialize, Debug, Deserialize)]
362#[serde(deny_unknown_fields)]
363pub struct PolicyRule {
364    pub subject_id: usize,
365    pub key_expr: OwnedKeyExpr,
366    pub message: AclMessage,
367    pub permission: Permission,
368    pub flow: InterceptorFlow,
369}
370
371#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
372#[serde(rename_all = "snake_case")]
373pub enum AclMessage {
374    Put,
375    Delete,
376    DeclareSubscriber,
377    Query,
378    DeclareQueryable,
379    Reply,
380    LivelinessToken,
381    DeclareLivelinessSubscriber,
382    LivelinessQuery,
383}
384
385#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
386#[serde(rename_all = "snake_case")]
387pub enum Permission {
388    Allow,
389    Deny,
390}
391
392/// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
393#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
394#[serde(rename_all = "kebab-case")]
395pub enum AutoConnectStrategy {
396    /// Always attempt to connect to another node, may result in redundant connection which
397    /// will be then be closed.
398    #[default]
399    Always,
400    /// A node will attempt to connect to another one only if its own zid is greater than the
401    /// other one. If both nodes use this strategy, only one will attempt the connection.
402    /// This strategy may not be suited if one of the node is not reachable by the other one,
403    /// for example because of a private IP.
404    GreaterZid,
405}
406
407#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
408pub struct StatsFilterConfig {
409    pub key: OwnedKeyExpr,
410}
411
412pub trait ConfigValidator: Send + Sync {
413    fn check_config(
414        &self,
415        _plugin_name: &str,
416        _path: &str,
417        _current: &serde_json::Map<String, serde_json::Value>,
418        _new: &serde_json::Map<String, serde_json::Value>,
419    ) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
420        Ok(None)
421    }
422}
423
424// Necessary to allow to set default emplty weak reference value to plugin.validator field
425// because empty weak value is not allowed for Arc<dyn Trait>
426impl ConfigValidator for () {}
427
428/// Creates an empty zenoh net Session configuration.
429pub fn empty() -> Config {
430    Config::default()
431}
432
433/// Creates a default zenoh net Session configuration (equivalent to `peer`).
434pub fn default() -> Config {
435    peer()
436}
437
438/// Creates a default `'peer'` mode zenoh net Session configuration.
439pub fn peer() -> Config {
440    let mut config = Config::default();
441    config.set_mode(Some(WhatAmI::Peer)).unwrap();
442    config
443}
444
445/// Creates a default `'client'` mode zenoh net Session configuration.
446pub fn client<I: IntoIterator<Item = T>, T: Into<EndPoint>>(peers: I) -> Config {
447    let mut config = Config::default();
448    config.set_mode(Some(WhatAmI::Client)).unwrap();
449    config.connect.endpoints =
450        ModeDependentValue::Unique(peers.into_iter().map(|t| t.into()).collect());
451    config
452}
453
454#[test]
455fn config_keys() {
456    let c = Config::default();
457    dbg!(Vec::from_iter(c.keys()));
458}
459
460validated_struct::validator! {
461    #[derive(Default)]
462    #[recursive_attrs]
463    #[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
464    #[serde(default)]
465    #[serde(deny_unknown_fields)]
466    #[doc(hidden)]
467    Config {
468        /// The Zenoh ID of the instance. This ID MUST be unique throughout your Zenoh infrastructure and cannot exceed 16 bytes of length. If left unset, a random u128 will be generated.
469        /// If not specified a random Zenoh ID will be generated upon session creation.
470        id: Option<ZenohId>,
471        /// The metadata of the instance. Arbitrary json data available from the admin space
472        metadata: Value,
473        /// The node's mode ("router" (default value in `zenohd`), "peer" or "client").
474        mode: Option<whatami::WhatAmI>,
475        /// Which zenoh nodes to connect to.
476        pub connect:
477        ConnectConfig {
478            /// global timeout for full connect cycle
479            pub timeout_ms: Option<ModeDependentValue<i64>>,
480            /// The list of endpoints to connect to
481            pub endpoints: ModeDependentValue<Vec<EndPoint>>,
482            /// if connection timeout exceed, exit from application
483            pub exit_on_failure: Option<ModeDependentValue<bool>>,
484            pub retry: Option<connection_retry::ConnectionRetryModeDependentConf>,
485        },
486        /// Which endpoints to listen on.
487        pub listen:
488        ListenConfig {
489            /// global timeout for full listen cycle
490            pub timeout_ms: Option<ModeDependentValue<i64>>,
491            /// The list of endpoints to listen on
492            pub endpoints: ModeDependentValue<Vec<EndPoint>>,
493            /// if connection timeout exceed, exit from application
494            pub exit_on_failure: Option<ModeDependentValue<bool>>,
495            pub retry: Option<connection_retry::ConnectionRetryModeDependentConf>,
496        },
497        /// Configure the session open behavior.
498        pub open: #[derive(Default)]
499        OpenConf {
500            /// Configure the conditions to be met before session open returns.
501            pub return_conditions: #[derive(Default)]
502            ReturnConditionsConf {
503                /// Session open waits to connect to scouted peers and routers before returning.
504                /// When set to false, first publications and queries after session open from peers may be lost.
505                connect_scouted: Option<bool>,
506                /// Session open waits to receive initial declares from connected peers before returning.
507                /// Setting to false may cause extra traffic at startup from peers.
508                declares: Option<bool>,
509            },
510        },
511        pub scouting: #[derive(Default)]
512        ScoutingConf {
513            /// In client mode, the period dedicated to scouting for a router before failing. In milliseconds.
514            timeout: Option<u64>,
515            /// In peer mode, the period dedicated to scouting remote peers before attempting other operations. In milliseconds.
516            delay: Option<u64>,
517            /// The multicast scouting configuration.
518            pub multicast: #[derive(Default)]
519            ScoutingMulticastConf {
520                /// Whether multicast scouting is enabled or not. If left empty, `zenohd` will set it according to the presence of the `--no-multicast-scouting` argument.
521                enabled: Option<bool>,
522                /// The socket which should be used for multicast scouting. `zenohd` will use `224.0.0.224:7446` by default if none is provided.
523                address: Option<SocketAddr>,
524                /// The network interface which should be used for multicast scouting. `zenohd` will automatically select an interface if none is provided.
525                interface: Option<String>,
526                /// The time-to-live on multicast scouting packets. (default: 1)
527                pub ttl: Option<u32>,
528                /// Which type of Zenoh instances to automatically establish sessions with upon discovery through UDP multicast.
529                autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
530                /// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
531                autoconnect_strategy: Option<ModeDependentValue<TargetDependentValue<AutoConnectStrategy>>>,
532                /// Whether or not to listen for scout messages on UDP multicast and reply to them.
533                listen: Option<ModeDependentValue<bool>>,
534            },
535            /// The gossip scouting configuration.
536            pub gossip: #[derive(Default)]
537            GossipConf {
538                /// Whether gossip scouting is enabled or not.
539                enabled: Option<bool>,
540                /// When true, gossip scouting information are propagated multiple hops to all nodes in the local network.
541                /// When false, gossip scouting information are only propagated to the next hop.
542                /// Activating multihop gossip implies more scouting traffic and a lower scalability.
543                /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
544                /// direct connectivity with each other.
545                multihop: Option<bool>,
546                /// Which type of Zenoh instances to send gossip messages to.
547                target: Option<ModeDependentValue<WhatAmIMatcher>>,
548                /// Which type of Zenoh instances to automatically establish sessions with upon discovery through gossip.
549                autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
550                /// Strategy for autoconnection, mainly to avoid nodes connecting to each other redundantly.
551                autoconnect_strategy: Option<ModeDependentValue<TargetDependentValue<AutoConnectStrategy>>>,
552            },
553        },
554
555        /// Configuration of data messages timestamps management.
556        pub timestamping: #[derive(Default)]
557        TimestampingConf {
558            /// Whether data messages should be timestamped if not already.
559            enabled: Option<ModeDependentValue<bool>>,
560            /// Whether data messages with timestamps in the future should be dropped or not.
561            /// If set to false (default), messages with timestamps in the future are retimestamped.
562            /// Timestamps are ignored if timestamping is disabled.
563            drop_future_timestamp: Option<bool>,
564        },
565
566        /// The default timeout to apply to queries in milliseconds.
567        queries_default_timeout: Option<u64>,
568
569        /// The routing strategy to use and it's configuration.
570        pub routing: #[derive(Default)]
571        RoutingConf {
572            /// The routing strategy to use in routers and it's configuration.
573            pub router: #[derive(Default)]
574            RouterRoutingConf {
575                /// When set to true a router will forward data between two peers
576                /// directly connected to it if it detects that those peers are not
577                /// connected to each other.
578                /// The failover brokering only works if gossip discovery is enabled.
579                peers_failover_brokering: Option<bool>,
580                /// Linkstate mode configuration.
581                pub linkstate: #[derive(Default)]
582                LinkstateConf {
583                    /// Weights of the outgoing links in linkstate mode.
584                    /// If none of the two endpoint nodes of a transport specifies its weight, a weight of 100 is applied.
585                    /// If only one of the two endpoint nodes of a transport specifies its weight, the specified weight is applied.
586                    /// If both endpoint nodes of a transport specify its weight, the greater weight is applied.
587                    pub transport_weights: Vec<TransportWeight>,
588                },
589            },
590            /// The routing strategy to use in peers and it's configuration.
591            pub peer: #[derive(Default)]
592            PeerRoutingConf {
593                /// The routing strategy to use in peers. ("peer_to_peer" or "linkstate").
594                /// This option needs to be set to the same value in all peers and routers of the subsystem.
595                mode: Option<String>,
596                /// Linkstate mode configuration (only taken into account if mode == "linkstate").
597                pub linkstate: LinkstateConf,
598            },
599            /// The interests-based routing configuration.
600            /// This configuration applies regardless of the mode (router, peer or client).
601            pub interests: #[derive(Default)]
602            InterestsConf {
603                /// The timeout to wait for incoming interests declarations.
604                timeout: Option<u64>,
605            },
606        },
607
608        /// The declarations aggregation strategy.
609        pub aggregation: #[derive(Default)]
610        AggregationConf {
611            /// A list of key-expressions for which all included subscribers will be aggregated into.
612            subscribers: Vec<OwnedKeyExpr>,
613            /// A list of key-expressions for which all included publishers will be aggregated into.
614            publishers: Vec<OwnedKeyExpr>,
615        },
616
617        /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config)
618        pub qos: #[derive(Default)]
619        QoSConfig {
620            /// A list of QoS configurations for PUT and DELETE messages by key expressions
621            publication: PublisherQoSConfList,
622            /// Configuration of the qos overwrite interceptor rules
623            network: Vec<QosOverwriteItemConf>,
624        },
625
626        pub transport: #[derive(Default)]
627        TransportConf {
628            pub unicast: TransportUnicastConf {
629                /// Timeout in milliseconds when opening a link (default: 10000).
630                open_timeout: u64,
631                /// Timeout in milliseconds when accepting a link (default: 10000).
632                accept_timeout: u64,
633                /// Number of links that may stay pending during accept phase (default: 100).
634                accept_pending: usize,
635                /// Maximum number of unicast sessions (default: 1000)
636                max_sessions: usize,
637                /// Maximum number of unicast incoming links per transport session (default: 1)
638                /// If set to a value greater than 1, multiple outgoing links are also allowed;
639                /// otherwise, only one outgoing link is allowed.
640                /// Issue https://github.com/eclipse-zenoh/zenoh/issues/1533
641                max_links: usize,
642                /// Enables the LowLatency transport (default `false`).
643                /// This option does not make LowLatency transport mandatory, the actual implementation of transport
644                /// used will depend on Establish procedure and other party's settings
645                lowlatency: bool,
646                pub qos: QoSUnicastConf {
647                    /// Whether QoS is enabled or not.
648                    /// If set to `false`, the QoS will be disabled. (default `true`).
649                    enabled: bool
650                },
651                pub compression: CompressionUnicastConf {
652                    /// You must compile zenoh with "transport_compression" feature to be able to enable compression.
653                    /// When enabled is true, batches will be sent compressed. (default `false`).
654                    enabled: bool,
655                },
656            },
657            pub multicast: TransportMulticastConf {
658                /// Link join interval duration in milliseconds (default: 2500)
659                join_interval: Option<u64>,
660                /// Maximum number of multicast sessions (default: 1000)
661                max_sessions: Option<usize>,
662                pub qos: QoSMulticastConf {
663                    /// Whether QoS is enabled or not.
664                    /// If set to `false`, the QoS will be disabled. (default `false`).
665                    enabled: bool
666                },
667                pub compression: CompressionMulticastConf {
668                    /// You must compile zenoh with "transport_compression" feature to be able to enable compression.
669                    /// When enabled is true, batches will be sent compressed. (default `false`).
670                    enabled: bool,
671                },
672            },
673            pub link: #[derive(Default)]
674            TransportLinkConf {
675                // An optional whitelist of protocols to be used for accepting and opening sessions.
676                // If not configured, all the supported protocols are automatically whitelisted.
677                pub protocols: Option<Vec<String>>,
678                pub tx: LinkTxConf {
679                    /// The resolution in bits to be used for the message sequence numbers.
680                    /// When establishing a session with another Zenoh instance, the lowest value of the two instances will be used.
681                    /// Accepted values: 8bit, 16bit, 32bit, 64bit.
682                    sequence_number_resolution: Bits where (sequence_number_resolution_validator),
683                    /// Link lease duration in milliseconds (default: 10000)
684                    lease: u64,
685                    /// Number of keep-alive messages in a link lease duration (default: 4)
686                    keep_alive: usize,
687                    /// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1)
688                    batch_size: BatchSize,
689                    pub queue: #[derive(Default)]
690                    QueueConf {
691                        /// The size of each priority queue indicates the number of batches a given queue can contain.
692                        /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE.
693                        /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE,
694                        /// then amount of memory being allocated for each queue is SIZE_XXX * LINK_MTU.
695                        /// If qos is false, then only the DATA priority will be allocated.
696                        pub size: QueueSizeConf {
697                            control: usize,
698                            real_time: usize,
699                            interactive_high: usize,
700                            interactive_low: usize,
701                            data_high: usize,
702                            data: usize,
703                            data_low: usize,
704                            background: usize,
705                        } where (queue_size_validator),
706                        /// Congestion occurs when the queue is empty (no available batch).
707                        /// Using CongestionControl::Block the caller is blocked until a batch is available and re-inserted into the queue.
708                        /// Using CongestionControl::Drop the message might be dropped, depending on conditions configured here.
709                        pub congestion_control: #[derive(Default)]
710                        CongestionControlConf {
711                            /// Behavior pushing CongestionControl::Drop messages to the queue.
712                            pub drop: CongestionControlDropConf {
713                                /// The maximum time in microseconds to wait for an available batch before dropping a droppable message
714                                /// if still no batch is available.
715                                wait_before_drop: i64,
716                                /// The maximum deadline limit for multi-fragment messages.
717                                max_wait_before_drop_fragments: i64,
718                            },
719                            /// Behavior pushing CongestionControl::Block messages to the queue.
720                            pub block: CongestionControlBlockConf {
721                                /// The maximum time in microseconds to wait for an available batch before closing the transport session
722                                /// when sending a blocking message if still no batch is available.
723                                wait_before_close: i64,
724                            },
725                        },
726                        pub batching: BatchingConf {
727                            /// Perform adaptive batching of messages if they are smaller of the batch_size.
728                            /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
729                            /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
730                            /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
731                            enabled: bool,
732                            /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
733                            time_limit: u64,
734                        },
735                        /// Perform lazy memory allocation of batches in the prioritiey queues. If set to false all batches are initialized at
736                        /// initialization time. If set to true the batches will be allocated when needed up to the maximum number of batches
737                        /// configured in the size configuration parameter.
738                        pub allocation: #[derive(Default, Copy, PartialEq, Eq)]
739                        QueueAllocConf {
740                            pub mode: QueueAllocMode,
741                        },
742                    },
743                    // Number of threads used for TX
744                    threads: usize,
745                },
746                pub rx: LinkRxConf {
747                    /// Receiving buffer size in bytes for each link
748                    /// The default the rx_buffer_size value is the same as the default batch size: 65535.
749                    /// For very high throughput scenarios, the rx_buffer_size can be increased to accommodate
750                    /// more in-flight data. This is particularly relevant when dealing with large messages.
751                    /// E.g. for 16MiB rx_buffer_size set the value to: 16777216.
752                    buffer_size: usize,
753                    /// Maximum size of the defragmentation buffer at receiver end (default: 1GiB).
754                    /// Fragmented messages that are larger than the configured size will be dropped.
755                    max_message_size: usize,
756                },
757                pub tls: #[derive(Default)]
758                TLSConf {
759                    root_ca_certificate: Option<String>,
760                    listen_private_key: Option<String>,
761                    listen_certificate: Option<String>,
762                    enable_mtls: Option<bool>,
763                    connect_private_key: Option<String>,
764                    connect_certificate: Option<String>,
765                    verify_name_on_connect: Option<bool>,
766                    close_link_on_expiration: Option<bool>,
767                    /// Configure TCP write buffer size
768                    pub so_sndbuf: Option<u32>,
769                    /// Configure TCP read buffer size
770                    pub so_rcvbuf: Option<u32>,
771                    // Skip serializing field because they contain secrets
772                    #[serde(skip_serializing)]
773                    root_ca_certificate_base64: Option<SecretValue>,
774                    #[serde(skip_serializing)]
775                    listen_private_key_base64:  Option<SecretValue>,
776                    #[serde(skip_serializing)]
777                    listen_certificate_base64: Option<SecretValue>,
778                    #[serde(skip_serializing)]
779                    connect_private_key_base64 :  Option<SecretValue>,
780                    #[serde(skip_serializing)]
781                    connect_certificate_base64 :  Option<SecretValue>,
782                },
783                pub tcp: #[derive(Default)]
784                TcpConf {
785                    /// Configure TCP write buffer size
786                    pub so_sndbuf: Option<u32>,
787                    /// Configure TCP read buffer size
788                    pub so_rcvbuf: Option<u32>,
789                },
790                pub unixpipe: #[derive(Default)]
791                UnixPipeConf {
792                    file_access_mask: Option<u32>
793                },
794            },
795            pub shared_memory:
796            ShmConf {
797                /// Whether shared memory is enabled or not.
798                /// If set to `true`, the SHM buffer optimization support will be announced to other parties. (default `true`).
799                /// This option doesn't make SHM buffer optimization mandatory, the real support depends on other party setting
800                /// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
801                /// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
802                /// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
803                enabled: bool,
804                /// SHM resources initialization mode (default "lazy").
805                /// - "lazy": SHM subsystem internals will be initialized lazily upon the first SHM buffer
806                /// allocation or reception. This setting provides better startup time and optimizes resource usage,
807                /// but produces extra latency at the first SHM buffer interaction.
808                /// - "init": SHM subsystem internals will be initialized upon Session opening. This setting sacrifices
809                /// startup time, but guarantees no latency impact when first SHM buffer is processed.
810                mode: ShmInitMode,
811                pub transport_optimization:
812                LargeMessageTransportOpt {
813                    /// Enables transport optimization for large messages (default `true`).
814                    /// Implicitly puts large messages into shared memory for transports with SHM-compatible connection.
815                    enabled: bool,
816                    /// SHM arena size in bytes used for transport optimization (default `16 * 1024 * 1024`).
817                    pool_size: NonZeroUsize,
818                    /// Allow optimization for messages equal or larger than this threshold in bytes (default `3072`).
819                    message_size_threshold: usize,
820                },
821            },
822            pub auth: #[derive(Default)]
823            AuthConf {
824                /// The configuration of authentication.
825                /// A password implies a username is required.
826                pub usrpwd: #[derive(Default)]
827                UsrPwdConf {
828                    user: Option<String>,
829                    password: Option<String>,
830                    /// The path to a file containing the user password dictionary, a file containing `<user>:<password>`
831                    dictionary_file: Option<String>,
832                } where (user_conf_validator),
833                pub pubkey: #[derive(Default)]
834                PubKeyConf {
835                    public_key_pem: Option<String>,
836                    private_key_pem: Option<String>,
837                    public_key_file: Option<String>,
838                    private_key_file: Option<String>,
839                    key_size: Option<usize>,
840                    known_keys_file: Option<String>,
841                },
842            },
843
844        },
845        /// Configuration of the admin space.
846        pub adminspace: #[derive(Default)]
847        /// <div class="stab unstable">
848        ///   <span class="emoji">🔬</span>
849        ///   This API has been marked as unstable: it works as advertised, but we may change it in a future release.
850        ///   To use it, you must enable zenoh's <code>unstable</code> feature flag.
851        /// </div>
852        AdminSpaceConf {
853            /// Enable the admin space
854            #[serde(default = "set_false")]
855            pub enabled: bool,
856            /// Permissions on the admin space
857            pub permissions:
858            PermissionsConf {
859                /// Whether the admin space replies to queries (true by default).
860                #[serde(default = "set_true")]
861                pub read: bool,
862                /// Whether the admin space accepts config changes at runtime (false by default).
863                #[serde(default = "set_false")]
864                pub write: bool,
865            },
866
867        },
868
869        /// Namespace prefix.
870        /// If not None, all outgoing key expressions will be
871        /// automatically prefixed with specified string,
872        /// and all incoming key expressions will be stripped
873        /// of specified prefix.
874        /// Namespace is applied to the session.
875        /// E. g. if session has a namespace of "1" then session.put("my/keyexpr", message),
876        /// will put a message into "1/my/keyexpr". Same applies to all other operations within this session.
877        pub namespace: Option<OwnedNonWildKeyExpr>,
878
879        /// Configuration of the downsampling.
880        downsampling: Vec<DownsamplingItemConf> where (downsampling_validator),
881
882        /// Configuration of the access control (ACL)
883        pub access_control: AclConfig {
884            pub enabled: bool,
885            pub default_permission: Permission,
886            pub rules: Option<Vec<AclConfigRule>>,
887            pub subjects: Option<Vec<AclConfigSubjects>>,
888            pub policies: Option<Vec<AclConfigPolicyEntry>>,
889        },
890
891        /// Configuration of the low-pass filter
892        pub low_pass_filter: Vec<LowPassFilterConf>,
893
894        /// Configuration of the stats per keyexpr
895        pub stats: #[derive(Default, PartialEq, Eq)] StatsConfig {
896            filters: Vec<StatsFilterConfig>,
897        },
898
899        /// A list of directories where plugins may be searched for if no `__path__` was specified for them.
900        /// The executable's current directory will be added to the search paths.
901        pub plugins_loading: #[derive(Default)]
902        PluginsLoading {
903            pub enabled: bool,
904            pub search_dirs: LibSearchDirs,
905        },
906        #[validated(recursive_accessors)]
907        /// The configuration for plugins.
908        ///
909        /// Please refer to [`PluginsConfig`]'s documentation for further details.
910        plugins: PluginsConfig,
911    }
912}
913
914#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
915#[serde(rename_all = "snake_case")]
916pub enum QueueAllocMode {
917    Init,
918    #[default]
919    Lazy,
920}
921
922#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
923#[serde(rename_all = "snake_case")]
924pub enum ShmInitMode {
925    Init,
926    #[default]
927    Lazy,
928}
929
930impl Default for PermissionsConf {
931    fn default() -> Self {
932        PermissionsConf {
933            read: true,
934            write: false,
935        }
936    }
937}
938
939fn set_true() -> bool {
940    true
941}
942fn set_false() -> bool {
943    false
944}
945
946#[test]
947fn config_deser() {
948    let config = Config::from_deserializer(
949        &mut json5::Deserializer::from_str(
950            r#"{
951        scouting: {
952          multicast: {
953            enabled: false,
954            autoconnect: ["peer", "router"]
955          }
956        }
957      }"#,
958        )
959        .unwrap(),
960    )
961    .unwrap();
962    assert_eq!(*config.scouting().multicast().enabled(), Some(false));
963    assert_eq!(
964        config.scouting().multicast().autoconnect().router(),
965        Some(&WhatAmIMatcher::empty().router().peer())
966    );
967    assert_eq!(
968        config.scouting().multicast().autoconnect().peer(),
969        Some(&WhatAmIMatcher::empty().router().peer())
970    );
971    assert_eq!(
972        config.scouting().multicast().autoconnect().client(),
973        Some(&WhatAmIMatcher::empty().router().peer())
974    );
975    let config = Config::from_deserializer(
976        &mut json5::Deserializer::from_str(
977            r#"{
978        scouting: {
979          multicast: {
980            enabled: false,
981            autoconnect: {router: [], peer: ["peer", "router"]}
982          }
983        }
984      }"#,
985        )
986        .unwrap(),
987    )
988    .unwrap();
989    assert_eq!(*config.scouting().multicast().enabled(), Some(false));
990    assert_eq!(
991        config.scouting().multicast().autoconnect().router(),
992        Some(&WhatAmIMatcher::empty())
993    );
994    assert_eq!(
995        config.scouting().multicast().autoconnect().peer(),
996        Some(&WhatAmIMatcher::empty().router().peer())
997    );
998    assert_eq!(config.scouting().multicast().autoconnect().client(), None);
999    let config = Config::from_deserializer(
1000        &mut json5::Deserializer::from_str(
1001            r#"{transport: { auth: { usrpwd: { user: null, password: null, dictionary_file: "file" }}}}"#,
1002        )
1003            .unwrap(),
1004    )
1005        .unwrap();
1006    assert_eq!(
1007        config
1008            .transport()
1009            .auth()
1010            .usrpwd()
1011            .dictionary_file()
1012            .as_ref()
1013            .map(|s| s.as_ref()),
1014        Some("file")
1015    );
1016    std::mem::drop(Config::from_deserializer(
1017        &mut json5::Deserializer::from_str(
1018            r#"{transport: { auth: { usrpwd: { user: null, password: null, user_password_dictionary: "file" }}}}"#,
1019        )
1020            .unwrap(),
1021    )
1022        .unwrap_err());
1023
1024    let config = Config::from_deserializer(
1025        &mut json5::Deserializer::from_str(
1026            r#"{
1027              qos: {
1028                network: [
1029                  {
1030                    messages: ["put"],
1031                    overwrite: {
1032                      priority: "foo",
1033                    },
1034                  },
1035                ],
1036              }
1037            }"#,
1038        )
1039        .unwrap(),
1040    );
1041    assert!(config.is_err());
1042
1043    let config = Config::from_deserializer(
1044        &mut json5::Deserializer::from_str(
1045            r#"{
1046              qos: {
1047                network: [
1048                  {
1049                    messages: ["put"],
1050                    overwrite: {
1051                      priority: +8,
1052                    },
1053                  },
1054                ],
1055              }
1056            }"#,
1057        )
1058        .unwrap(),
1059    );
1060    assert!(config.is_err());
1061
1062    let config = Config::from_deserializer(
1063        &mut json5::Deserializer::from_str(
1064            r#"{
1065              qos: {
1066                network: [
1067                  {
1068                    messages: ["put"],
1069                    overwrite: {
1070                      priority: "data_high",
1071                    },
1072                  },
1073                ],
1074              }
1075            }"#,
1076        )
1077        .unwrap(),
1078    )
1079    .unwrap();
1080    assert_eq!(
1081        config.qos().network().first().unwrap().overwrite.priority,
1082        Some(qos::PriorityUpdateConf::Priority(
1083            qos::PriorityConf::DataHigh
1084        ))
1085    );
1086
1087    let config = Config::from_deserializer(
1088        &mut json5::Deserializer::from_str(
1089            r#"{
1090              qos: {
1091                network: [
1092                  {
1093                    messages: ["put"],
1094                    overwrite: {
1095                      priority: +1,
1096                    },
1097                  },
1098                ],
1099              }
1100            }"#,
1101        )
1102        .unwrap(),
1103    )
1104    .unwrap();
1105    assert_eq!(
1106        config.qos().network().first().unwrap().overwrite.priority,
1107        Some(qos::PriorityUpdateConf::Increment(1))
1108    );
1109
1110    let config = Config::from_deserializer(
1111        &mut json5::Deserializer::from_str(
1112            r#"{
1113              qos: {
1114                network: [
1115                  {
1116                    messages: ["put"],
1117                    payload_size: "0..99",
1118                    overwrite: {},
1119                  },
1120                ],
1121              }
1122            }"#,
1123        )
1124        .unwrap(),
1125    )
1126    .unwrap();
1127    assert_eq!(
1128        config
1129            .qos()
1130            .network()
1131            .first()
1132            .unwrap()
1133            .payload_size
1134            .as_ref()
1135            .map(|r| (r.start_bound(), r.end_bound())),
1136        Some((Bound::Included(&0), Bound::Included(&99)))
1137    );
1138
1139    let config = Config::from_deserializer(
1140        &mut json5::Deserializer::from_str(
1141            r#"{
1142              qos: {
1143                network: [
1144                  {
1145                    messages: ["put"],
1146                    payload_size: "100..",
1147                    overwrite: {},
1148                  },
1149                ],
1150              }
1151            }"#,
1152        )
1153        .unwrap(),
1154    )
1155    .unwrap();
1156    assert_eq!(
1157        config
1158            .qos()
1159            .network()
1160            .first()
1161            .unwrap()
1162            .payload_size
1163            .as_ref()
1164            .map(|r| (r.start_bound(), r.end_bound())),
1165        Some((Bound::Included(&100), Bound::Unbounded))
1166    );
1167
1168    let config = Config::from_deserializer(
1169        &mut json5::Deserializer::from_str(
1170            r#"{
1171              qos: {
1172                network: [
1173                  {
1174                    messages: ["put"],
1175                    qos: {
1176                      congestion_control: "drop",
1177                      priority: "data",
1178                      express: true,
1179                      reliability: "reliable",
1180                    },
1181                    overwrite: {},
1182                  },
1183                ],
1184              }
1185            }"#,
1186        )
1187        .unwrap(),
1188    )
1189    .unwrap();
1190    assert_eq!(
1191        config.qos().network().first().unwrap().qos,
1192        Some(QosFilter {
1193            congestion_control: Some(qos::CongestionControlConf::Drop),
1194            priority: Some(qos::PriorityConf::Data),
1195            express: Some(true),
1196            reliability: Some(qos::ReliabilityConf::Reliable),
1197        })
1198    );
1199
1200    dbg!(Config::from_file("../../DEFAULT_CONFIG.json5").unwrap());
1201}
1202
1203impl Config {
1204    pub fn insert<'d, D: serde::Deserializer<'d>>(
1205        &mut self,
1206        key: &str,
1207        value: D,
1208    ) -> Result<(), validated_struct::InsertionError>
1209    where
1210        validated_struct::InsertionError: From<D::Error>,
1211    {
1212        <Self as ValidatedMap>::insert(self, key, value)
1213    }
1214
1215    pub fn get(
1216        &self,
1217        key: &str,
1218    ) -> Result<<Self as ValidatedMapAssociatedTypes<'_>>::Accessor, GetError> {
1219        <Self as ValidatedMap>::get(self, key)
1220    }
1221
1222    pub fn get_json(&self, key: &str) -> Result<String, GetError> {
1223        <Self as ValidatedMap>::get_json(self, key)
1224    }
1225
1226    pub fn insert_json5(
1227        &mut self,
1228        key: &str,
1229        value: &str,
1230    ) -> Result<(), validated_struct::InsertionError> {
1231        <Self as ValidatedMap>::insert_json5(self, key, value)
1232    }
1233
1234    pub fn keys(&self) -> impl Iterator<Item = String> {
1235        <Self as ValidatedMap>::keys(self).into_iter()
1236    }
1237
1238    pub fn set_plugin_validator<T: ConfigValidator + 'static>(&mut self, validator: Weak<T>) {
1239        self.plugins.validator = validator;
1240    }
1241
1242    pub fn plugin(&self, name: &str) -> Option<&Value> {
1243        self.plugins.values.get(name)
1244    }
1245
1246    pub fn sift_privates(&self) -> Self {
1247        let mut copy = self.clone();
1248        copy.plugins.sift_privates();
1249        copy
1250    }
1251
1252    pub fn remove<K: AsRef<str>>(&mut self, key: K) -> ZResult<()> {
1253        let key = key.as_ref();
1254
1255        let key = key.strip_prefix('/').unwrap_or(key);
1256        if !key.starts_with("plugins/") {
1257            bail!(
1258                "Removal of values from Config is only supported for keys starting with `plugins/`"
1259            )
1260        }
1261        self.plugins.remove(&key["plugins/".len()..])
1262    }
1263
1264    pub fn get_retry_config(
1265        &self,
1266        endpoint: Option<&EndPoint>,
1267        listen: bool,
1268    ) -> ConnectionRetryConf {
1269        get_retry_config(self, endpoint, listen)
1270    }
1271}
1272
1273#[derive(Debug)]
1274pub enum ConfigOpenErr {
1275    IoError(std::io::Error),
1276    JsonParseErr(json5::Error),
1277    InvalidConfiguration(Box<Config>),
1278}
1279impl std::fmt::Display for ConfigOpenErr {
1280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1281        match self {
1282            ConfigOpenErr::IoError(e) => write!(f, "Couldn't open file: {e}"),
1283            ConfigOpenErr::JsonParseErr(e) => write!(f, "JSON5 parsing error {e}"),
1284            ConfigOpenErr::InvalidConfiguration(c) => write!(
1285                f,
1286                "Invalid configuration {}",
1287                serde_json::to_string(c).unwrap()
1288            ),
1289        }
1290    }
1291}
1292impl std::error::Error for ConfigOpenErr {}
1293impl Config {
1294    pub fn from_file<P: AsRef<Path>>(path: P) -> ZResult<Self> {
1295        let path = path.as_ref();
1296        let mut config = Self::_from_file(path)?;
1297        config.plugins.load_external_configs()?;
1298        Ok(config)
1299    }
1300
1301    fn _from_file(path: &Path) -> ZResult<Config> {
1302        match std::fs::File::open(path) {
1303            Ok(mut f) => {
1304                let mut content = String::new();
1305                if let Err(e) = f.read_to_string(&mut content) {
1306                    bail!(e)
1307                }
1308                if content.is_empty() {
1309                    bail!("Empty config file");
1310                }
1311                match path
1312                    .extension()
1313                    .map(|s| s.to_str().unwrap())
1314                {
1315                    Some("json") | Some("json5") => match json5::Deserializer::from_str(&content) {
1316                        Ok(mut d) => Config::from_deserializer(&mut d).map_err(|e| match e {
1317                            Ok(c) => zerror!("Invalid configuration: {}", c).into(),
1318                            Err(e) => zerror!("JSON error: {:?}", e).into(),
1319                        }),
1320                        Err(e) => bail!(e),
1321                    },
1322                    Some("yaml") | Some("yml") => Config::from_deserializer(serde_yaml::Deserializer::from_str(&content)).map_err(|e| match e {
1323                        Ok(c) => zerror!("Invalid configuration: {}", c).into(),
1324                        Err(e) => zerror!("YAML error: {:?}", e).into(),
1325                    }),
1326                    #[cfg(feature = "unstable")]
1327                    Some("toml") => {
1328                        tracing::warn!("The TOML configuration format is unstable and may be removed in a future release");
1329                        match toml::Deserializer::parse(&content) {
1330                            Ok(de) => Config::from_deserializer(de).map_err(|e| match e {
1331                                Ok(c) => zerror!("Invalid configuration: {}", c).into(),
1332                                Err(e) => zerror!("TOML deserization error: {:?}", e).into(),
1333                            }),
1334                            Err(e) => bail!("TOML parsing error: {:?}", e),
1335                        }
1336                    },
1337                    Some(other) => bail!("Unsupported file type '.{}' (.json, .json5 and .yaml are supported)", other),
1338                    None => bail!("Unsupported file type. Configuration files must have an extension (.json, .json5 and .yaml supported)")
1339                }
1340            }
1341            Err(e) => bail!(e),
1342        }
1343    }
1344
1345    pub fn libloader(&self) -> LibLoader {
1346        if self.plugins_loading.enabled {
1347            LibLoader::new(self.plugins_loading.search_dirs().clone())
1348        } else {
1349            LibLoader::empty()
1350        }
1351    }
1352}
1353
1354impl std::fmt::Display for Config {
1355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1356        serde_json::to_value(self)
1357            .map(|mut json| {
1358                sift_privates(&mut json);
1359                write!(f, "{json}")
1360            })
1361            .map_err(|e| {
1362                _ = write!(f, "{e:?}");
1363                fmt::Error
1364            })?
1365    }
1366}
1367
1368#[test]
1369fn config_from_json() {
1370    let from_str = serde_json::Deserializer::from_str;
1371    let mut config = Config::from_deserializer(&mut from_str(r#"{}"#)).unwrap();
1372    config
1373        .insert("transport/link/tx/lease", &mut from_str("168"))
1374        .unwrap();
1375    dbg!(std::mem::size_of_val(&config));
1376    println!("{}", serde_json::to_string_pretty(&config).unwrap());
1377}
1378
1379fn sequence_number_resolution_validator(b: &Bits) -> bool {
1380    b <= &Bits::from(TransportSn::MAX)
1381}
1382
1383fn queue_size_validator(q: &QueueSizeConf) -> bool {
1384    fn check(size: &usize) -> bool {
1385        (QueueSizeConf::MIN..=QueueSizeConf::MAX).contains(size)
1386    }
1387
1388    let QueueSizeConf {
1389        control,
1390        real_time,
1391        interactive_low,
1392        interactive_high,
1393        data_high,
1394        data,
1395        data_low,
1396        background,
1397    } = q;
1398    check(control)
1399        && check(real_time)
1400        && check(interactive_low)
1401        && check(interactive_high)
1402        && check(data_high)
1403        && check(data)
1404        && check(data_low)
1405        && check(background)
1406}
1407
1408fn user_conf_validator(u: &UsrPwdConf) -> bool {
1409    (u.password().is_none() && u.user().is_none()) || (u.password().is_some() && u.user().is_some())
1410}
1411
1412/// This part of the configuration is highly dynamic (any [`serde_json::Value`] may be put in there), but should follow this scheme:
1413/// ```javascript
1414/// plugins: {
1415///     // `plugin_name` must be unique per configuration, and will be used to find the appropriate
1416///     // dynamic library to load if no `__path__` is specified
1417///     [plugin_name]: {
1418///         // Defaults to `false`. Setting this to `true` does 2 things:
1419///         // * If `zenohd` fails to locate the requested plugin, it will crash instead of logging an error.
1420///         // * Plugins are expected to check this value to set their panic-behaviour: plugins are encouraged
1421///         //   to panic upon non-recoverable errors if their `__required__` flag is set to `true`, and to
1422///         //   simply log them otherwise
1423///         __required__: bool,
1424///         // The path(s) where the plugin is expected to be located.
1425///         // If none is specified, `zenohd` will search for a `<dylib_prefix>zenoh_plugin_<plugin_name>.<dylib_suffix>` file in the search directories.
1426///         // If any path is specified, file-search will be disabled, and the first path leading to
1427///         // an existing file will be used
1428///         __path__: string | [string],
1429///         // [plugin_name] may require additional configuration
1430///         ...
1431///     }
1432/// }
1433/// ```
1434#[derive(Clone)]
1435pub struct PluginsConfig {
1436    values: Value,
1437    validator: std::sync::Weak<dyn ConfigValidator>,
1438}
1439fn sift_privates(value: &mut serde_json::Value) {
1440    match value {
1441        Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => {}
1442        Value::Array(a) => a.iter_mut().for_each(sift_privates),
1443        Value::Object(o) => {
1444            o.remove("private");
1445            o.values_mut().for_each(sift_privates);
1446        }
1447    }
1448}
1449
1450fn load_external_plugin_config(title: &str, value: &mut Value) -> ZResult<()> {
1451    let Some(values) = value.as_object_mut() else {
1452        bail!("{} must be object", title);
1453    };
1454    recursive_include(title, values, HashSet::new(), "__config__", ".")
1455}
1456
1457#[derive(Debug, Clone)]
1458pub struct PluginLoad {
1459    pub id: String,
1460    pub name: String,
1461    pub paths: Option<Vec<String>>,
1462    pub required: bool,
1463}
1464impl PluginsConfig {
1465    pub fn sift_privates(&mut self) {
1466        sift_privates(&mut self.values);
1467    }
1468    fn load_external_configs(&mut self) -> ZResult<()> {
1469        let Some(values) = self.values.as_object_mut() else {
1470            bail!("plugins configuration must be an object")
1471        };
1472        for (name, value) in values.iter_mut() {
1473            load_external_plugin_config(format!("plugins.{}", name.as_str()).as_str(), value)?;
1474        }
1475        Ok(())
1476    }
1477    pub fn load_requests(&'_ self) -> impl Iterator<Item = PluginLoad> + '_ {
1478        self.values.as_object().unwrap().iter().map(|(id, value)| {
1479            let value = value.as_object().expect("Plugin configurations must be objects");
1480            let required = match value.get("__required__") {
1481                None => false,
1482                Some(Value::Bool(b)) => *b,
1483                _ => panic!("Plugin '{id}' has an invalid '__required__' configuration property (must be a boolean)")
1484            };
1485            let name = match value.get("__plugin__") {
1486                Some(Value::String(p)) => p,
1487                _ => id,
1488            };
1489
1490            if let Some(paths) = value.get("__path__") {
1491                let paths = match paths {
1492                    Value::String(s) => vec![s.clone()],
1493                    Value::Array(a) => a.iter().map(|s| if let Value::String(s) = s { s.clone() } else { panic!("Plugin '{id}' has an invalid '__path__' configuration property (must be either string or array of strings)") }).collect(),
1494                    _ => panic!("Plugin '{id}' has an invalid '__path__' configuration property (must be either string or array of strings)")
1495                };
1496                PluginLoad { id: id.clone(), name: name.clone(), paths: Some(paths), required }
1497            } else {
1498                PluginLoad { id: id.clone(), name: name.clone(), paths: None, required }
1499            }
1500        })
1501    }
1502    pub fn remove(&mut self, key: &str) -> ZResult<()> {
1503        let mut split = key.split('/');
1504        let plugin = split.next().unwrap();
1505        let mut current = match split.next() {
1506            Some(first_in_plugin) => first_in_plugin,
1507            None => {
1508                self.values.as_object_mut().unwrap().remove(plugin);
1509                return Ok(());
1510            }
1511        };
1512        let (old_conf, mut new_conf) = match self.values.get_mut(plugin) {
1513            Some(plugin) => {
1514                let clone = plugin.clone();
1515                (plugin, clone)
1516            }
1517            None => bail!("No plugin {} to edit", plugin),
1518        };
1519        let mut remove_from = &mut new_conf;
1520        for next in split {
1521            match remove_from {
1522                Value::Object(o) => match o.get_mut(current) {
1523                    Some(v) => {
1524                        remove_from = unsafe {
1525                            std::mem::transmute::<&mut serde_json::Value, &mut serde_json::Value>(v)
1526                        }
1527                    }
1528                    None => bail!("{:?} has no {} property", o, current),
1529                },
1530                Value::Array(a) => {
1531                    let index: usize = current.parse()?;
1532                    if a.len() <= index {
1533                        bail!("{:?} cannot be indexed at {}", a, index)
1534                    }
1535                    remove_from = &mut a[index];
1536                }
1537                other => bail!("{} cannot be indexed", other),
1538            }
1539            current = next
1540        }
1541        match remove_from {
1542            Value::Object(o) => {
1543                if o.remove(current).is_none() {
1544                    bail!("{:?} has no {} property", o, current)
1545                }
1546            }
1547            Value::Array(a) => {
1548                let index: usize = current.parse()?;
1549                if a.len() <= index {
1550                    bail!("{:?} cannot be indexed at {}", a, index)
1551                }
1552                a.remove(index);
1553            }
1554            other => bail!("{} cannot be indexed", other),
1555        }
1556        let new_conf = if let Some(validator) = self.validator.upgrade() {
1557            match validator.check_config(
1558                plugin,
1559                &key[("plugins/".len() + plugin.len())..],
1560                old_conf.as_object().unwrap(),
1561                new_conf.as_object().unwrap(),
1562            )? {
1563                None => new_conf,
1564                Some(new_conf) => Value::Object(new_conf),
1565            }
1566        } else {
1567            new_conf
1568        };
1569        *old_conf = new_conf;
1570        Ok(())
1571    }
1572}
1573impl serde::Serialize for PluginsConfig {
1574    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1575    where
1576        S: serde::Serializer,
1577    {
1578        let mut value = self.values.clone();
1579        sift_privates(&mut value);
1580        value.serialize(serializer)
1581    }
1582}
1583impl Default for PluginsConfig {
1584    fn default() -> Self {
1585        Self {
1586            values: Value::Object(Default::default()),
1587            validator: std::sync::Weak::<()>::new(),
1588        }
1589    }
1590}
1591impl<'a> serde::Deserialize<'a> for PluginsConfig {
1592    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1593    where
1594        D: serde::Deserializer<'a>,
1595    {
1596        Ok(PluginsConfig {
1597            values: serde::Deserialize::deserialize(deserializer)?,
1598            validator: std::sync::Weak::<()>::new(),
1599        })
1600    }
1601}
1602
1603impl std::fmt::Debug for PluginsConfig {
1604    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1605        let mut values: Value = self.values.clone();
1606        sift_privates(&mut values);
1607        write!(f, "{values:?}")
1608    }
1609}
1610
1611trait PartialMerge: Sized {
1612    fn merge(self, path: &str, value: Self) -> Result<Self, validated_struct::InsertionError>;
1613}
1614impl PartialMerge for serde_json::Value {
1615    fn merge(
1616        mut self,
1617        path: &str,
1618        new_value: Self,
1619    ) -> Result<Self, validated_struct::InsertionError> {
1620        let mut value = &mut self;
1621        let mut key = path;
1622        let key_not_found = || {
1623            Err(validated_struct::InsertionError::String(format!(
1624                "{path} not found"
1625            )))
1626        };
1627        while !key.is_empty() {
1628            let (current, new_key) = validated_struct::split_once(key, '/');
1629            key = new_key;
1630            if current.is_empty() {
1631                continue;
1632            }
1633            value = match value {
1634                Value::Bool(_) | Value::Number(_) | Value::String(_) => return key_not_found(),
1635                Value::Null => match current {
1636                    "0" | "+" => {
1637                        *value = Value::Array(vec![Value::Null]);
1638                        &mut value[0]
1639                    }
1640                    _ => {
1641                        *value = Value::Object(Default::default());
1642                        value
1643                            .as_object_mut()
1644                            .unwrap()
1645                            .entry(current)
1646                            .or_insert(Value::Null)
1647                    }
1648                },
1649                Value::Array(a) => match current {
1650                    "+" => {
1651                        a.push(Value::Null);
1652                        a.last_mut().unwrap()
1653                    }
1654                    "0" if a.is_empty() => {
1655                        a.push(Value::Null);
1656                        a.last_mut().unwrap()
1657                    }
1658                    _ => match current.parse::<usize>() {
1659                        Ok(i) => match a.get_mut(i) {
1660                            Some(r) => r,
1661                            None => return key_not_found(),
1662                        },
1663                        Err(_) => return key_not_found(),
1664                    },
1665                },
1666                Value::Object(v) => v.entry(current).or_insert(Value::Null),
1667            }
1668        }
1669        *value = new_value;
1670        Ok(self)
1671    }
1672}
1673impl<'a> validated_struct::ValidatedMapAssociatedTypes<'a> for PluginsConfig {
1674    type Accessor = &'a dyn Any;
1675}
1676impl validated_struct::ValidatedMap for PluginsConfig {
1677    fn insert<'d, D: serde::Deserializer<'d>>(
1678        &mut self,
1679        key: &str,
1680        deserializer: D,
1681    ) -> Result<(), validated_struct::InsertionError>
1682    where
1683        validated_struct::InsertionError: From<D::Error>,
1684    {
1685        let (plugin, key) = validated_struct::split_once(key, '/');
1686        let new_value: Value = serde::Deserialize::deserialize(deserializer)?;
1687        let value = self
1688            .values
1689            .as_object_mut()
1690            .unwrap()
1691            .entry(plugin)
1692            .or_insert(Value::Null);
1693        let new_value = value.clone().merge(key, new_value)?;
1694        *value = if let Some(validator) = self.validator.upgrade() {
1695            // New plugin configuration for compare with original configuration.
1696            // Return error if it's not an object.
1697            // Note: it's ok if original "new_value" is not an object: this can be some subkey of the plugin configuration. But the result of the merge should be an object.
1698            // Error occurs  if the original plugin configuration is not an object itself (e.g. null).
1699            let Some(new_plugin_config) = new_value.as_object() else {
1700                return Err(format!(
1701                    "Attempt to provide non-object value as configuration for plugin `{plugin}`"
1702                )
1703                .into());
1704            };
1705            // Original plugin configuration for compare with new configuration.
1706            // If for some reason it's not defined or not an object, we default to an empty object.
1707            // Usually this happens when no plugin with this name defined. Reject then should be performed by the validator with `plugin not found` error.
1708            let empty_config = Map::new();
1709            let current_plugin_config = value.as_object().unwrap_or(&empty_config);
1710            match validator.check_config(plugin, key, current_plugin_config, new_plugin_config) {
1711                // Validator made changes to the proposed configuration, take these changes
1712                Ok(Some(val)) => Value::Object(val),
1713                // Validator accepted the proposed configuration as is
1714                Ok(None) => new_value,
1715                // Validator rejected the proposed configuration
1716                Err(e) => return Err(format!("{e}").into()),
1717            }
1718        } else {
1719            new_value
1720        };
1721        Ok(())
1722    }
1723    fn get<'a>(&'a self, mut key: &str) -> Result<&'a dyn Any, GetError> {
1724        let (current, new_key) = validated_struct::split_once(key, '/');
1725        key = new_key;
1726        let mut value = match self.values.get(current) {
1727            Some(matched) => matched,
1728            None => return Err(GetError::NoMatchingKey),
1729        };
1730        while !key.is_empty() {
1731            let (current, new_key) = validated_struct::split_once(key, '/');
1732            key = new_key;
1733            let matched = match value {
1734                serde_json::Value::Null
1735                | serde_json::Value::Bool(_)
1736                | serde_json::Value::Number(_)
1737                | serde_json::Value::String(_) => return Err(GetError::NoMatchingKey),
1738                serde_json::Value::Array(a) => a.get(match current.parse::<usize>() {
1739                    Ok(i) => i,
1740                    Err(_) => return Err(GetError::NoMatchingKey),
1741                }),
1742                serde_json::Value::Object(v) => v.get(current),
1743            };
1744            value = match matched {
1745                Some(matched) => matched,
1746                None => return Err(GetError::NoMatchingKey),
1747            }
1748        }
1749        Ok(value)
1750    }
1751
1752    type Keys = Vec<String>;
1753    fn keys(&self) -> Self::Keys {
1754        self.values.as_object().unwrap().keys().cloned().collect()
1755    }
1756
1757    fn get_json(&self, mut key: &str) -> Result<String, GetError> {
1758        let (current, new_key) = validated_struct::split_once(key, '/');
1759        key = new_key;
1760        let mut value = match self.values.get(current) {
1761            Some(matched) => matched,
1762            None => return Err(GetError::NoMatchingKey),
1763        };
1764        while !key.is_empty() {
1765            let (current, new_key) = validated_struct::split_once(key, '/');
1766            key = new_key;
1767            let matched = match value {
1768                serde_json::Value::Null
1769                | serde_json::Value::Bool(_)
1770                | serde_json::Value::Number(_)
1771                | serde_json::Value::String(_) => return Err(GetError::NoMatchingKey),
1772                serde_json::Value::Array(a) => a.get(match current.parse::<usize>() {
1773                    Ok(i) => i,
1774                    Err(_) => return Err(GetError::NoMatchingKey),
1775                }),
1776                serde_json::Value::Object(v) => v.get(current),
1777            };
1778            value = match matched {
1779                Some(matched) => matched,
1780                None => return Err(GetError::NoMatchingKey),
1781            }
1782        }
1783        Ok(serde_json::to_string(value).unwrap())
1784    }
1785}
1786
1787#[macro_export]
1788macro_rules! unwrap_or_default {
1789    ($val:ident$(.$field:ident($($param:ident)?))*) => {
1790        $val$(.$field($($param)?))*.clone().unwrap_or(zenoh_config::defaults$(::$field$(($param))?)*.into())
1791    };
1792}
1793
1794pub trait IConfig: Send + Sync {
1795    fn get(&self, key: &str) -> ZResult<String>;
1796    fn queries_default_timeout_ms(&self) -> u64;
1797    fn insert_json5(&self, key: &str, value: &str) -> ZResult<()>;
1798    fn to_json(&self) -> String;
1799}
1800
1801pub struct GenericConfig(Arc<dyn IConfig>);
1802
1803impl Deref for GenericConfig {
1804    type Target = Arc<dyn IConfig>;
1805
1806    fn deref(&self) -> &Self::Target {
1807        &self.0
1808    }
1809}
1810
1811impl GenericConfig {
1812    pub fn new(value: Arc<dyn IConfig>) -> Self {
1813        GenericConfig(value)
1814    }
1815
1816    pub fn get_typed<T: for<'a> Deserialize<'a>>(&self, key: &str) -> ZResult<T> {
1817        self.0
1818            .get(key)
1819            .and_then(|v| serde_json::from_str::<T>(&v).map_err(|e| e.into()))
1820    }
1821
1822    pub fn get_plugin_config(&self, plugin_name: &str) -> ZResult<Value> {
1823        self.get(&("plugins/".to_owned() + plugin_name))
1824            .and_then(|v| serde_json::from_str(&v).map_err(|e| e.into()))
1825    }
1826}
1827
1828impl fmt::Display for GenericConfig {
1829    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1830        f.write_str(&self.0.to_json())
1831    }
1832}
1833
1834#[cfg(test)]
1835mod tests {
1836    use std::{env, fs::File, io::Write, str::FromStr, time::SystemTime};
1837
1838    use zenoh_protocol::core::{EndPoint, WhatAmI};
1839
1840    use crate::{Config, ModeDependentValue, ZenohId};
1841
1842    #[test]
1843    fn test_toml_config_format() {
1844        const FILE_CONTENTS: &str = r#"
1845            id = "abc"
1846            mode = "router"
1847
1848            [listen]
1849            endpoints = ["tcp/localhost:7448"]
1850
1851            [adminspace]
1852            enabled = true
1853        "#;
1854
1855        let timestamp = SystemTime::now()
1856            .duration_since(SystemTime::UNIX_EPOCH)
1857            .unwrap()
1858            .as_secs();
1859
1860        let path = env::temp_dir().join(format!("{timestamp}.test.config.toml"));
1861
1862        {
1863            let mut tmp = File::create(&path).unwrap();
1864            tmp.write_all(FILE_CONTENTS.as_bytes()).unwrap();
1865            tmp.flush().unwrap();
1866        }
1867
1868        let expected_config = {
1869            let mut c = Config::default();
1870            c.set_id(Some(ZenohId::from_str("abc").unwrap())).unwrap();
1871            c.set_mode(Some(WhatAmI::Router)).unwrap();
1872            c.listen
1873                .set_endpoints(ModeDependentValue::Unique(vec![EndPoint::from_str(
1874                    "tcp/localhost:7448",
1875                )
1876                .unwrap()]))
1877                .unwrap();
1878            c.adminspace.set_enabled(true).unwrap();
1879            c
1880        };
1881
1882        assert_eq!(
1883            Config::from_file(&path).unwrap().to_string(),
1884            expected_config.to_string()
1885        );
1886    }
1887}