#![allow(deprecated)]
pub mod defaults;
pub mod gateway;
mod include;
pub mod qos;
pub mod wrappers;
#[allow(unused_imports)]
use std::convert::TryFrom;
use std::{
any::Any,
collections::HashSet,
fmt,
io::Read,
net::SocketAddr,
num::{NonZeroU16, NonZeroUsize},
ops::{self, Bound, Deref, DerefMut, RangeBounds},
path::Path,
sync::{Arc, Weak},
};
use include::recursive_include;
use nonempty_collections::NEVec;
use qos::{PublisherQoSConfList, QosFilter, QosOverwriteMessage, QosOverwrites};
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use validated_struct::ValidatedMapAssociatedTypes;
pub use validated_struct::{GetError, ValidatedMap};
pub use wrappers::ZenohId;
pub use zenoh_protocol::core::{
whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor,
};
use zenoh_protocol::{
core::{
key_expr::{OwnedKeyExpr, OwnedNonWildKeyExpr},
Bits, RegionName,
},
transport::{BatchSize, TransportSn},
};
use zenoh_result::{bail, zerror, ZResult};
use zenoh_util::{LibLoader, LibSearchDirs};
pub mod mode_dependent;
pub use mode_dependent::*;
pub mod connection_retry;
pub use connection_retry::*;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct SecretString(String);
impl ops::Deref for SecretString {
type Target = String;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl SerializableSecret for SecretString {}
impl DebugSecret for SecretString {}
impl CloneableSecret for SecretString {}
impl Zeroize for SecretString {
fn zeroize(&mut self) {
self.0 = "".to_string();
}
}
pub type SecretValue = Secret<SecretString>;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TransportWeight {
pub dst_zid: ZenohId,
pub weight: NonZeroU16,
}
#[derive(Debug, Deserialize, Serialize, Clone, Copy, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum InterceptorFlow {
Egress,
Ingress,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum DownsamplingMessage {
Delete,
#[deprecated = "Use `Put` or `Delete` instead."]
Push,
Put,
Query,
Reply,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct DownsamplingRuleConf {
pub key_expr: OwnedKeyExpr,
pub freq: f64,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct DownsamplingItemConf {
pub id: Option<String>,
pub interfaces: Option<NEVec<String>>,
pub link_protocols: Option<NEVec<InterceptorLink>>,
pub messages: NEVec<DownsamplingMessage>,
pub rules: NEVec<DownsamplingRuleConf>,
pub flows: Option<NEVec<InterceptorFlow>>,
}
fn downsampling_validator(d: &Vec<DownsamplingItemConf>) -> bool {
for item in d {
if item
.messages
.iter()
.any(|m| *m == DownsamplingMessage::Push)
{
tracing::warn!("In 'downsampling/messages' configuration: 'push' is deprecated and may not be supported in future versions, use 'put' and/or 'delete' instead");
}
}
true
}
#[derive(Serialize, Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct LowPassFilterConf {
pub id: Option<String>,
pub interfaces: Option<NEVec<String>>,
pub link_protocols: Option<NEVec<InterceptorLink>>,
pub flows: Option<NEVec<InterceptorFlow>>,
pub messages: NEVec<LowPassFilterMessage>,
pub key_exprs: NEVec<OwnedKeyExpr>,
pub size_limit: usize,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum LowPassFilterMessage {
Put,
Delete,
Query,
Reply,
}
#[derive(Serialize, Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct AclConfigRule {
pub id: String,
pub key_exprs: NEVec<OwnedKeyExpr>,
pub messages: NEVec<AclMessage>,
pub flows: Option<NEVec<InterceptorFlow>>,
pub permission: Permission,
}
#[derive(Serialize, Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct AclConfigSubjects {
pub id: String,
pub interfaces: Option<NEVec<Interface>>,
pub cert_common_names: Option<NEVec<CertCommonName>>,
pub usernames: Option<NEVec<Username>>,
pub link_protocols: Option<NEVec<InterceptorLink>>,
pub zids: Option<NEVec<ZenohId>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConfRange {
start: Option<u64>,
end: Option<u64>,
}
impl ConfRange {
pub fn new(start: Option<u64>, end: Option<u64>) -> Self {
Self { start, end }
}
}
impl RangeBounds<u64> for ConfRange {
fn start_bound(&self) -> Bound<&u64> {
match self.start {
Some(ref start) => Bound::Included(start),
None => Bound::Unbounded,
}
}
fn end_bound(&self) -> Bound<&u64> {
match self.end {
Some(ref end) => Bound::Included(end),
None => Bound::Unbounded,
}
}
}
impl serde::Serialize for ConfRange {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&format!(
"{}..{}",
self.start.unwrap_or_default(),
self.end.unwrap_or_default()
))
}
}
impl<'a> serde::Deserialize<'a> for ConfRange {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
struct V;
impl serde::de::Visitor<'_> for V {
type Value = ConfRange;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("range string")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let (start, end) = v
.split_once("..")
.ok_or_else(|| serde::de::Error::custom("invalid range"))?;
let parse_bound = |bound: &str| {
(!bound.is_empty())
.then(|| bound.parse::<u64>())
.transpose()
.map_err(|_| serde::de::Error::custom("invalid range bound"))
};
Ok(ConfRange::new(parse_bound(start)?, parse_bound(end)?))
}
}
deserializer.deserialize_str(V)
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct QosOverwriteItemConf {
pub id: Option<String>,
pub zids: Option<NEVec<ZenohId>>,
pub interfaces: Option<NEVec<String>>,
pub link_protocols: Option<NEVec<InterceptorLink>>,
pub messages: NEVec<QosOverwriteMessage>,
pub key_exprs: Option<NEVec<OwnedKeyExpr>>,
pub overwrite: QosOverwrites,
pub flows: Option<NEVec<InterceptorFlow>>,
pub qos: Option<QosFilter>,
pub payload_size: Option<ConfRange>,
}
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
pub struct Interface(pub String);
impl std::fmt::Display for Interface {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Interface({})", self.0)
}
}
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
pub struct CertCommonName(pub String);
impl std::fmt::Display for CertCommonName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CertCommonName({})", self.0)
}
}
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
pub struct Username(pub String);
impl std::fmt::Display for Username {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Username({})", self.0)
}
}
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
#[serde(rename_all = "kebab-case")]
pub enum InterceptorLink {
Tcp,
Udp,
Tls,
Quic,
Serial,
Unixpipe,
UnixsockStream,
Vsock,
Ws,
}
impl std::fmt::Display for InterceptorLink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Transport({self:?})")
}
}
#[derive(Serialize, Debug, Deserialize, Clone, PartialEq, Eq, Hash)]
#[serde(deny_unknown_fields)]
pub struct AclConfigPolicyEntry {
pub id: Option<String>,
pub rules: Vec<String>,
pub subjects: Vec<String>,
}
#[derive(Clone, Serialize, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PolicyRule {
pub subject_id: usize,
pub key_expr: OwnedKeyExpr,
pub message: AclMessage,
pub permission: Permission,
pub flow: InterceptorFlow,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum AclMessage {
Put,
Delete,
DeclareSubscriber,
Query,
DeclareQueryable,
Reply,
LivelinessToken,
DeclareLivelinessSubscriber,
LivelinessQuery,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum Permission {
Allow,
Deny,
}
#[derive(Default, Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum AutoConnectStrategy {
#[default]
Always,
GreaterZid,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct StatsFilterConfig {
pub key: OwnedKeyExpr,
}
pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
_plugin_name: &str,
_path: &str,
_current: &serde_json::Map<String, serde_json::Value>,
_new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
Ok(None)
}
}
impl ConfigValidator for () {}
pub fn empty() -> Config {
Config::default()
}
pub fn default() -> Config {
peer()
}
pub fn peer() -> Config {
let mut config = Config::default();
config.set_mode(Some(WhatAmI::Peer)).unwrap();
config
}
pub fn client<I: IntoIterator<Item = T>, T: Into<EndPoint>>(peers: I) -> Config {
let mut config = Config::default();
config.set_mode(Some(WhatAmI::Client)).unwrap();
config.connect.endpoints =
ModeDependentValue::Unique(peers.into_iter().map(|t| t.into()).collect());
config
}
#[test]
fn config_keys() {
let c = Config::default();
dbg!(Vec::from_iter(c.keys()));
}
#[derive(Clone, Debug, Default)]
struct DeprecatedPeersFailoverBrokering(Option<bool>);
impl serde::Serialize for DeprecatedPeersFailoverBrokering {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.0.serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for DeprecatedPeersFailoverBrokering {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
tracing::warn!(
"`routing.router.peers_failover_brokering` is deprecated and has no effect; \
please remove it from your configuration"
);
Option::<bool>::deserialize(deserializer).map(Self)
}
}
#[derive(Clone, Debug, Default)]
struct DeprecatedRoutingPeer(Option<Value>);
impl serde::Serialize for DeprecatedRoutingPeer {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.0.serialize(serializer)
}
}
impl<'de> serde::Deserialize<'de> for DeprecatedRoutingPeer {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
tracing::warn!(
"routing.peer.mode` and `routing.peer.linkstate` are deprecated and have no effect; \
please remove them from your configuration"
);
Option::<Value>::deserialize(deserializer).map(Self)
}
}
validated_struct::validator! {
#[derive(Default)]
#[recursive_attrs]
#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)]
#[serde(default)]
#[serde(deny_unknown_fields)]
#[doc(hidden)]
Config {
id: Option<ZenohId>,
metadata: Value,
mode: Option<whatami::WhatAmI>,
region_name: Option<RegionName>,
pub gateway: gateway::GatewayConf,
pub connect:
ConnectConfig {
pub timeout_ms: Option<ModeDependentValue<i64>>,
pub endpoints: ModeDependentValue<Vec<EndPoint>>,
pub exit_on_failure: Option<ModeDependentValue<bool>>,
pub retry: Option<connection_retry::ConnectionRetryModeDependentConf>,
},
pub listen:
ListenConfig {
pub timeout_ms: Option<ModeDependentValue<i64>>,
pub endpoints: ModeDependentValue<Vec<EndPoint>>,
pub exit_on_failure: Option<ModeDependentValue<bool>>,
pub retry: Option<connection_retry::ConnectionRetryModeDependentConf>,
},
pub open: #[derive(Default)]
OpenConf {
pub return_conditions: #[derive(Default)]
ReturnConditionsConf {
connect_scouted: Option<bool>,
declares: Option<bool>,
},
},
pub scouting: #[derive(Default)]
ScoutingConf {
timeout: Option<u64>,
delay: Option<u64>,
pub multicast: #[derive(Default)]
ScoutingMulticastConf {
enabled: Option<bool>,
address: Option<SocketAddr>,
interface: Option<String>,
pub ttl: Option<u32>,
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
autoconnect_strategy: Option<ModeDependentValue<TargetDependentValue<AutoConnectStrategy>>>,
listen: Option<ModeDependentValue<bool>>,
},
pub gossip: #[derive(Default)]
GossipConf {
enabled: Option<bool>,
multihop: Option<bool>,
target: Option<ModeDependentValue<WhatAmIMatcher>>,
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
autoconnect_strategy: Option<ModeDependentValue<TargetDependentValue<AutoConnectStrategy>>>,
},
},
pub timestamping: #[derive(Default)]
TimestampingConf {
enabled: Option<ModeDependentValue<bool>>,
drop_future_timestamp: Option<bool>,
},
queries_default_timeout: Option<u64>,
pub routing: #[derive(Default)]
RoutingConf {
pub router: #[derive(Default)]
RouterRoutingConf {
#[serde(default, skip_serializing)]
peers_failover_brokering: DeprecatedPeersFailoverBrokering,
pub linkstate: #[derive(Default)]
LinkstateConf {
pub transport_weights: Vec<TransportWeight>,
},
},
#[serde(default, skip_serializing)]
peer: DeprecatedRoutingPeer,
pub interests: #[derive(Default)]
InterestsConf {
timeout: Option<u64>,
},
},
pub aggregation: #[derive(Default)]
AggregationConf {
subscribers: Vec<OwnedKeyExpr>,
publishers: Vec<OwnedKeyExpr>,
},
pub qos: #[derive(Default)]
QoSConfig {
publication: PublisherQoSConfList,
network: Vec<QosOverwriteItemConf>,
},
pub transport: #[derive(Default)]
TransportConf {
pub unicast: TransportUnicastConf {
open_timeout: u64,
accept_timeout: u64,
accept_pending: usize,
max_sessions: usize,
max_links: usize,
lowlatency: bool,
pub qos: QoSUnicastConf {
enabled: bool
},
pub compression: CompressionUnicastConf {
enabled: bool,
},
},
pub multicast: TransportMulticastConf {
join_interval: Option<u64>,
max_sessions: Option<usize>,
pub qos: QoSMulticastConf {
enabled: bool
},
pub compression: CompressionMulticastConf {
enabled: bool,
},
},
pub link: #[derive(Default)]
TransportLinkConf {
pub protocols: Option<Vec<String>>,
pub tx: LinkTxConf {
sequence_number_resolution: Bits where (sequence_number_resolution_validator),
lease: u64,
keep_alive: usize,
batch_size: BatchSize,
pub queue: #[derive(Default)]
QueueConf {
pub size: QueueSizeConf {
control: usize,
real_time: usize,
interactive_high: usize,
interactive_low: usize,
data_high: usize,
data: usize,
data_low: usize,
background: usize,
} where (queue_size_validator),
pub congestion_control: #[derive(Default)]
CongestionControlConf {
pub drop: CongestionControlDropConf {
wait_before_drop: i64,
max_wait_before_drop_fragments: i64,
},
pub block: CongestionControlBlockConf {
wait_before_close: i64,
},
},
pub batching: BatchingConf {
enabled: bool,
time_limit: u64,
},
pub allocation: #[derive(Default, Copy, PartialEq, Eq)]
QueueAllocConf {
pub mode: QueueAllocMode,
},
},
threads: usize,
},
pub rx: LinkRxConf {
buffer_size: usize,
max_message_size: usize,
},
pub tls: #[derive(Default)]
TLSConf {
root_ca_certificate: Option<String>,
listen_private_key: Option<String>,
listen_certificate: Option<String>,
enable_mtls: Option<bool>,
connect_private_key: Option<String>,
connect_certificate: Option<String>,
verify_name_on_connect: Option<bool>,
close_link_on_expiration: Option<bool>,
pub so_sndbuf: Option<u32>,
pub so_rcvbuf: Option<u32>,
#[serde(skip_serializing)]
root_ca_certificate_base64: Option<SecretValue>,
#[serde(skip_serializing)]
listen_private_key_base64: Option<SecretValue>,
#[serde(skip_serializing)]
listen_certificate_base64: Option<SecretValue>,
#[serde(skip_serializing)]
connect_private_key_base64 : Option<SecretValue>,
#[serde(skip_serializing)]
connect_certificate_base64 : Option<SecretValue>,
},
pub tcp: #[derive(Default)]
TcpConf {
pub so_sndbuf: Option<u32>,
pub so_rcvbuf: Option<u32>,
},
pub unixpipe: #[derive(Default)]
UnixPipeConf {
file_access_mask: Option<u32>
},
},
pub shared_memory:
ShmConf {
enabled: bool,
mode: ShmInitMode,
pub transport_optimization:
LargeMessageTransportOpt {
enabled: bool,
pool_size: NonZeroUsize,
message_size_threshold: usize,
},
},
pub auth: #[derive(Default)]
AuthConf {
pub usrpwd: #[derive(Default)]
UsrPwdConf {
user: Option<String>,
password: Option<String>,
dictionary_file: Option<String>,
} where (user_conf_validator),
pub pubkey: #[derive(Default)]
PubKeyConf {
public_key_pem: Option<String>,
private_key_pem: Option<String>,
public_key_file: Option<String>,
private_key_file: Option<String>,
key_size: Option<usize>,
known_keys_file: Option<String>,
},
},
},
pub adminspace: #[derive(Default)]
AdminSpaceConf {
#[serde(default = "set_false")]
pub enabled: bool,
pub permissions:
PermissionsConf {
#[serde(default = "set_true")]
pub read: bool,
#[serde(default = "set_false")]
pub write: bool,
},
},
pub namespace: Option<OwnedNonWildKeyExpr>,
downsampling: Vec<DownsamplingItemConf> where (downsampling_validator),
pub access_control: AclConfig {
pub enabled: bool,
pub default_permission: Permission,
pub rules: Option<Vec<AclConfigRule>>,
pub subjects: Option<Vec<AclConfigSubjects>>,
pub policies: Option<Vec<AclConfigPolicyEntry>>,
},
pub low_pass_filter: Vec<LowPassFilterConf>,
pub stats: #[derive(Default, PartialEq, Eq)] StatsConfig {
filters: Vec<StatsFilterConfig>,
},
pub plugins_loading: #[derive(Default)]
PluginsLoading {
pub enabled: bool,
pub search_dirs: LibSearchDirs,
},
#[validated(recursive_accessors)]
plugins: PluginsConfig,
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QueueAllocMode {
Init,
#[default]
Lazy,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ShmInitMode {
Init,
#[default]
Lazy,
}
impl Default for PermissionsConf {
fn default() -> Self {
PermissionsConf {
read: true,
write: false,
}
}
}
fn set_true() -> bool {
true
}
fn set_false() -> bool {
false
}
#[test]
fn config_deser() {
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
scouting: {
multicast: {
enabled: false,
autoconnect: ["peer", "router"]
}
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(*config.scouting().multicast().enabled(), Some(false));
assert_eq!(
config.scouting().multicast().autoconnect().router(),
Some(&WhatAmIMatcher::empty().router().peer())
);
assert_eq!(
config.scouting().multicast().autoconnect().peer(),
Some(&WhatAmIMatcher::empty().router().peer())
);
assert_eq!(
config.scouting().multicast().autoconnect().client(),
Some(&WhatAmIMatcher::empty().router().peer())
);
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
scouting: {
multicast: {
enabled: false,
autoconnect: {router: [], peer: ["peer", "router"]}
}
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(*config.scouting().multicast().enabled(), Some(false));
assert_eq!(
config.scouting().multicast().autoconnect().router(),
Some(&WhatAmIMatcher::empty())
);
assert_eq!(
config.scouting().multicast().autoconnect().peer(),
Some(&WhatAmIMatcher::empty().router().peer())
);
assert_eq!(config.scouting().multicast().autoconnect().client(), None);
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{transport: { auth: { usrpwd: { user: null, password: null, dictionary_file: "file" }}}}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(
config
.transport()
.auth()
.usrpwd()
.dictionary_file()
.as_ref()
.map(|s| s.as_ref()),
Some("file")
);
std::mem::drop(Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{transport: { auth: { usrpwd: { user: null, password: null, user_password_dictionary: "file" }}}}"#,
)
.unwrap(),
)
.unwrap_err());
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
overwrite: {
priority: "foo",
},
},
],
}
}"#,
)
.unwrap(),
);
assert!(config.is_err());
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
overwrite: {
priority: +8,
},
},
],
}
}"#,
)
.unwrap(),
);
assert!(config.is_err());
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
overwrite: {
priority: "data_high",
},
},
],
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(
config.qos().network().first().unwrap().overwrite.priority,
Some(qos::PriorityUpdateConf::Priority(
qos::PriorityConf::DataHigh
))
);
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
overwrite: {
priority: +1,
},
},
],
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(
config.qos().network().first().unwrap().overwrite.priority,
Some(qos::PriorityUpdateConf::Increment(1))
);
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
payload_size: "0..99",
overwrite: {},
},
],
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(
config
.qos()
.network()
.first()
.unwrap()
.payload_size
.as_ref()
.map(|r| (r.start_bound(), r.end_bound())),
Some((Bound::Included(&0), Bound::Included(&99)))
);
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
payload_size: "100..",
overwrite: {},
},
],
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(
config
.qos()
.network()
.first()
.unwrap()
.payload_size
.as_ref()
.map(|r| (r.start_bound(), r.end_bound())),
Some((Bound::Included(&100), Bound::Unbounded))
);
let config = Config::from_deserializer(
&mut json5::Deserializer::from_str(
r#"{
qos: {
network: [
{
messages: ["put"],
qos: {
congestion_control: "drop",
priority: "data",
express: true,
reliability: "reliable",
},
overwrite: {},
},
],
}
}"#,
)
.unwrap(),
)
.unwrap();
assert_eq!(
config.qos().network().first().unwrap().qos,
Some(QosFilter {
congestion_control: Some(qos::CongestionControlConf::Drop),
priority: Some(qos::PriorityConf::Data),
express: Some(true),
reliability: Some(qos::ReliabilityConf::Reliable),
})
);
dbg!(Config::from_file("../../DEFAULT_CONFIG.json5").unwrap());
}
impl Config {
pub fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
value: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
<Self as ValidatedMap>::insert(self, key, value)
}
pub fn get(
&self,
key: &str,
) -> Result<<Self as ValidatedMapAssociatedTypes<'_>>::Accessor, GetError> {
<Self as ValidatedMap>::get(self, key)
}
pub fn get_json(&self, key: &str) -> Result<String, GetError> {
<Self as ValidatedMap>::get_json(self, key)
}
pub fn insert_json5(
&mut self,
key: &str,
value: &str,
) -> Result<(), validated_struct::InsertionError> {
<Self as ValidatedMap>::insert_json5(self, key, value)
}
pub fn keys(&self) -> impl Iterator<Item = String> {
<Self as ValidatedMap>::keys(self).into_iter()
}
pub fn set_plugin_validator<T: ConfigValidator + 'static>(&mut self, validator: Weak<T>) {
self.plugins.validator = validator;
}
pub fn plugin(&self, name: &str) -> Option<&Value> {
self.plugins.values.get(name)
}
pub fn sift_privates(&self) -> Self {
let mut copy = self.clone();
copy.plugins.sift_privates();
copy
}
pub fn remove<K: AsRef<str>>(&mut self, key: K) -> ZResult<()> {
let key = key.as_ref();
let key = key.strip_prefix('/').unwrap_or(key);
if !key.starts_with("plugins/") {
bail!(
"Removal of values from Config is only supported for keys starting with `plugins/`"
)
}
self.plugins.remove(&key["plugins/".len()..])
}
pub fn get_retry_config(
&self,
endpoint: Option<&EndPoint>,
listen: bool,
) -> ConnectionRetryConf {
get_retry_config(self, endpoint, listen)
}
}
#[derive(Debug)]
pub enum ConfigOpenErr {
IoError(std::io::Error),
JsonParseErr(json5::Error),
InvalidConfiguration(Box<Config>),
}
impl std::fmt::Display for ConfigOpenErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConfigOpenErr::IoError(e) => write!(f, "Couldn't open file: {e}"),
ConfigOpenErr::JsonParseErr(e) => write!(f, "JSON5 parsing error {e}"),
ConfigOpenErr::InvalidConfiguration(c) => write!(
f,
"Invalid configuration {}",
serde_json::to_string(c).unwrap()
),
}
}
}
impl std::error::Error for ConfigOpenErr {}
impl Config {
pub fn from_file<P: AsRef<Path>>(path: P) -> ZResult<Self> {
let path = path.as_ref();
let mut config = Self::_from_file(path)?;
config.plugins.load_external_configs()?;
Ok(config)
}
fn _from_file(path: &Path) -> ZResult<Config> {
match std::fs::File::open(path) {
Ok(mut f) => {
let mut content = String::new();
if let Err(e) = f.read_to_string(&mut content) {
bail!(e)
}
if content.is_empty() {
bail!("Empty config file");
}
match path
.extension()
.map(|s| s.to_str().unwrap())
{
Some("json") | Some("json5") => match json5::Deserializer::from_str(&content) {
Ok(mut d) => Config::from_deserializer(&mut d).map_err(|e| match e {
Ok(c) => zerror!("Invalid configuration: {}", c).into(),
Err(e) => zerror!("JSON error: {:?}", e).into(),
}),
Err(e) => bail!(e),
},
Some("yaml") | Some("yml") => Config::from_deserializer(serde_yaml::Deserializer::from_str(&content)).map_err(|e| match e {
Ok(c) => zerror!("Invalid configuration: {}", c).into(),
Err(e) => zerror!("YAML error: {:?}", e).into(),
}),
#[cfg(feature = "unstable")]
Some("toml") => {
tracing::warn!("The TOML configuration format is unstable and may be removed in a future release");
match toml::Deserializer::parse(&content) {
Ok(de) => Config::from_deserializer(de).map_err(|e| match e {
Ok(c) => zerror!("Invalid configuration: {}", c).into(),
Err(e) => zerror!("TOML deserization error: {:?}", e).into(),
}),
Err(e) => bail!("TOML parsing error: {:?}", e),
}
},
Some(other) => bail!("Unsupported file type '.{}' (.json, .json5 and .yaml are supported)", other),
None => bail!("Unsupported file type. Configuration files must have an extension (.json, .json5 and .yaml supported)")
}
}
Err(e) => bail!(e),
}
}
pub fn libloader(&self) -> LibLoader {
if self.plugins_loading.enabled {
LibLoader::new(self.plugins_loading.search_dirs().clone())
} else {
LibLoader::empty()
}
}
pub fn expanded(mut self) -> ExpandedConfig {
if self.id.is_none() {
self.set_id(Some(ZenohId::default())).unwrap();
}
if self.mode.is_none() {
self.set_mode(Some(WhatAmI::default())).unwrap();
}
ExpandedConfig(self)
}
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct ExpandedConfig(Config);
impl ExpandedConfig {
pub fn id(&self) -> ZenohId {
self.0.id.unwrap()
}
pub fn mode(&self) -> WhatAmI {
self.0.mode.unwrap()
}
}
impl Deref for ExpandedConfig {
type Target = Config;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ExpandedConfig {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl std::fmt::Display for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
serde_json::to_value(self)
.map(|mut json| {
sift_privates(&mut json);
write!(f, "{json}")
})
.map_err(|e| {
_ = write!(f, "{e:?}");
fmt::Error
})?
}
}
#[test]
fn config_from_json() {
let from_str = serde_json::Deserializer::from_str;
let mut config = Config::from_deserializer(&mut from_str(r#"{}"#)).unwrap();
config
.insert("transport/link/tx/lease", &mut from_str("168"))
.unwrap();
dbg!(std::mem::size_of_val(&config));
println!("{}", serde_json::to_string_pretty(&config).unwrap());
}
fn sequence_number_resolution_validator(b: &Bits) -> bool {
b <= &Bits::from(TransportSn::MAX)
}
fn queue_size_validator(q: &QueueSizeConf) -> bool {
fn check(size: &usize) -> bool {
(QueueSizeConf::MIN..=QueueSizeConf::MAX).contains(size)
}
let QueueSizeConf {
control,
real_time,
interactive_low,
interactive_high,
data_high,
data,
data_low,
background,
} = q;
check(control)
&& check(real_time)
&& check(interactive_low)
&& check(interactive_high)
&& check(data_high)
&& check(data)
&& check(data_low)
&& check(background)
}
fn user_conf_validator(u: &UsrPwdConf) -> bool {
(u.password().is_none() && u.user().is_none()) || (u.password().is_some() && u.user().is_some())
}
#[derive(Clone)]
pub struct PluginsConfig {
values: Value,
validator: std::sync::Weak<dyn ConfigValidator>,
}
fn sift_privates(value: &mut serde_json::Value) {
match value {
Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => {}
Value::Array(a) => a.iter_mut().for_each(sift_privates),
Value::Object(o) => {
o.remove("private");
o.values_mut().for_each(sift_privates);
}
}
}
fn load_external_plugin_config(title: &str, value: &mut Value) -> ZResult<()> {
let Some(values) = value.as_object_mut() else {
bail!("{} must be object", title);
};
recursive_include(title, values, HashSet::new(), "__config__", ".")
}
#[derive(Debug, Clone)]
pub struct PluginLoad {
pub id: String,
pub name: String,
pub paths: Option<Vec<String>>,
pub required: bool,
}
impl PluginsConfig {
pub fn sift_privates(&mut self) {
sift_privates(&mut self.values);
}
fn load_external_configs(&mut self) -> ZResult<()> {
let Some(values) = self.values.as_object_mut() else {
bail!("plugins configuration must be an object")
};
for (name, value) in values.iter_mut() {
load_external_plugin_config(format!("plugins.{}", name.as_str()).as_str(), value)?;
}
Ok(())
}
pub fn load_requests(&'_ self) -> impl Iterator<Item = PluginLoad> + '_ {
self.values.as_object().unwrap().iter().map(|(id, value)| {
let value = value.as_object().expect("Plugin configurations must be objects");
let required = match value.get("__required__") {
None => false,
Some(Value::Bool(b)) => *b,
_ => panic!("Plugin '{id}' has an invalid '__required__' configuration property (must be a boolean)")
};
let name = match value.get("__plugin__") {
Some(Value::String(p)) => p,
_ => id,
};
if let Some(paths) = value.get("__path__") {
let paths = match paths {
Value::String(s) => vec![s.clone()],
Value::Array(a) => a.iter().map(|s| if let Value::String(s) = s { s.clone() } else { panic!("Plugin '{id}' has an invalid '__path__' configuration property (must be either string or array of strings)") }).collect(),
_ => panic!("Plugin '{id}' has an invalid '__path__' configuration property (must be either string or array of strings)")
};
PluginLoad { id: id.clone(), name: name.clone(), paths: Some(paths), required }
} else {
PluginLoad { id: id.clone(), name: name.clone(), paths: None, required }
}
})
}
pub fn remove(&mut self, key: &str) -> ZResult<()> {
let mut split = key.split('/');
let plugin = split.next().unwrap();
let mut current = match split.next() {
Some(first_in_plugin) => first_in_plugin,
None => {
self.values.as_object_mut().unwrap().remove(plugin);
return Ok(());
}
};
let (old_conf, mut new_conf) = match self.values.get_mut(plugin) {
Some(plugin) => {
let clone = plugin.clone();
(plugin, clone)
}
None => bail!("No plugin {} to edit", plugin),
};
let mut remove_from = &mut new_conf;
for next in split {
match remove_from {
Value::Object(o) => match o.get_mut(current) {
Some(v) => {
remove_from = unsafe {
std::mem::transmute::<&mut serde_json::Value, &mut serde_json::Value>(v)
}
}
None => bail!("{:?} has no {} property", o, current),
},
Value::Array(a) => {
let index: usize = current.parse()?;
if a.len() <= index {
bail!("{:?} cannot be indexed at {}", a, index)
}
remove_from = &mut a[index];
}
other => bail!("{} cannot be indexed", other),
}
current = next
}
match remove_from {
Value::Object(o) => {
if o.remove(current).is_none() {
bail!("{:?} has no {} property", o, current)
}
}
Value::Array(a) => {
let index: usize = current.parse()?;
if a.len() <= index {
bail!("{:?} cannot be indexed at {}", a, index)
}
a.remove(index);
}
other => bail!("{} cannot be indexed", other),
}
let new_conf = if let Some(validator) = self.validator.upgrade() {
match validator.check_config(
plugin,
&key[("plugins/".len() + plugin.len())..],
old_conf.as_object().unwrap(),
new_conf.as_object().unwrap(),
)? {
None => new_conf,
Some(new_conf) => Value::Object(new_conf),
}
} else {
new_conf
};
*old_conf = new_conf;
Ok(())
}
}
impl serde::Serialize for PluginsConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut value = self.values.clone();
sift_privates(&mut value);
value.serialize(serializer)
}
}
impl Default for PluginsConfig {
fn default() -> Self {
Self {
values: Value::Object(Default::default()),
validator: std::sync::Weak::<()>::new(),
}
}
}
impl<'a> serde::Deserialize<'a> for PluginsConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
Ok(PluginsConfig {
values: serde::Deserialize::deserialize(deserializer)?,
validator: std::sync::Weak::<()>::new(),
})
}
}
impl std::fmt::Debug for PluginsConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut values: Value = self.values.clone();
sift_privates(&mut values);
write!(f, "{values:?}")
}
}
trait PartialMerge: Sized {
fn merge(self, path: &str, value: Self) -> Result<Self, validated_struct::InsertionError>;
}
impl PartialMerge for serde_json::Value {
fn merge(
mut self,
path: &str,
new_value: Self,
) -> Result<Self, validated_struct::InsertionError> {
let mut value = &mut self;
let mut key = path;
let key_not_found = || {
Err(validated_struct::InsertionError::String(format!(
"{path} not found"
)))
};
while !key.is_empty() {
let (current, new_key) = validated_struct::split_once(key, '/');
key = new_key;
if current.is_empty() {
continue;
}
value = match value {
Value::Bool(_) | Value::Number(_) | Value::String(_) => return key_not_found(),
Value::Null => match current {
"0" | "+" => {
*value = Value::Array(vec![Value::Null]);
&mut value[0]
}
_ => {
*value = Value::Object(Default::default());
value
.as_object_mut()
.unwrap()
.entry(current)
.or_insert(Value::Null)
}
},
Value::Array(a) => match current {
"+" => {
a.push(Value::Null);
a.last_mut().unwrap()
}
"0" if a.is_empty() => {
a.push(Value::Null);
a.last_mut().unwrap()
}
_ => match current.parse::<usize>() {
Ok(i) => match a.get_mut(i) {
Some(r) => r,
None => return key_not_found(),
},
Err(_) => return key_not_found(),
},
},
Value::Object(v) => v.entry(current).or_insert(Value::Null),
}
}
*value = new_value;
Ok(self)
}
}
impl<'a> validated_struct::ValidatedMapAssociatedTypes<'a> for PluginsConfig {
type Accessor = &'a dyn Any;
}
impl validated_struct::ValidatedMap for PluginsConfig {
fn insert<'d, D: serde::Deserializer<'d>>(
&mut self,
key: &str,
deserializer: D,
) -> Result<(), validated_struct::InsertionError>
where
validated_struct::InsertionError: From<D::Error>,
{
let (plugin, key) = validated_struct::split_once(key, '/');
let new_value: Value = serde::Deserialize::deserialize(deserializer)?;
let value = self
.values
.as_object_mut()
.unwrap()
.entry(plugin)
.or_insert(Value::Null);
let new_value = value.clone().merge(key, new_value)?;
*value = if let Some(validator) = self.validator.upgrade() {
let Some(new_plugin_config) = new_value.as_object() else {
return Err(format!(
"Attempt to provide non-object value as configuration for plugin `{plugin}`"
)
.into());
};
let empty_config = Map::new();
let current_plugin_config = value.as_object().unwrap_or(&empty_config);
match validator.check_config(plugin, key, current_plugin_config, new_plugin_config) {
Ok(Some(val)) => Value::Object(val),
Ok(None) => new_value,
Err(e) => return Err(format!("{e}").into()),
}
} else {
new_value
};
Ok(())
}
fn get<'a>(&'a self, mut key: &str) -> Result<&'a dyn Any, GetError> {
let (current, new_key) = validated_struct::split_once(key, '/');
key = new_key;
let mut value = match self.values.get(current) {
Some(matched) => matched,
None => return Err(GetError::NoMatchingKey),
};
while !key.is_empty() {
let (current, new_key) = validated_struct::split_once(key, '/');
key = new_key;
let matched = match value {
serde_json::Value::Null
| serde_json::Value::Bool(_)
| serde_json::Value::Number(_)
| serde_json::Value::String(_) => return Err(GetError::NoMatchingKey),
serde_json::Value::Array(a) => a.get(match current.parse::<usize>() {
Ok(i) => i,
Err(_) => return Err(GetError::NoMatchingKey),
}),
serde_json::Value::Object(v) => v.get(current),
};
value = match matched {
Some(matched) => matched,
None => return Err(GetError::NoMatchingKey),
}
}
Ok(value)
}
type Keys = Vec<String>;
fn keys(&self) -> Self::Keys {
self.values.as_object().unwrap().keys().cloned().collect()
}
fn get_json(&self, mut key: &str) -> Result<String, GetError> {
let (current, new_key) = validated_struct::split_once(key, '/');
key = new_key;
let mut value = match self.values.get(current) {
Some(matched) => matched,
None => return Err(GetError::NoMatchingKey),
};
while !key.is_empty() {
let (current, new_key) = validated_struct::split_once(key, '/');
key = new_key;
let matched = match value {
serde_json::Value::Null
| serde_json::Value::Bool(_)
| serde_json::Value::Number(_)
| serde_json::Value::String(_) => return Err(GetError::NoMatchingKey),
serde_json::Value::Array(a) => a.get(match current.parse::<usize>() {
Ok(i) => i,
Err(_) => return Err(GetError::NoMatchingKey),
}),
serde_json::Value::Object(v) => v.get(current),
};
value = match matched {
Some(matched) => matched,
None => return Err(GetError::NoMatchingKey),
}
}
Ok(serde_json::to_string(value).unwrap())
}
}
#[macro_export]
macro_rules! unwrap_or_default {
($val:ident$(.$field:ident($($param:ident)?))*) => {
$val$(.$field($($param)?))*.clone().unwrap_or(zenoh_config::defaults$(::$field$(($param))?)*.into())
};
}
pub trait IConfig: Send + Sync {
fn get(&self, key: &str) -> ZResult<String>;
fn queries_default_timeout_ms(&self) -> u64;
fn insert_json5(&self, key: &str, value: &str) -> ZResult<()>;
fn to_json(&self) -> String;
}
pub struct GenericConfig(Arc<dyn IConfig>);
impl Deref for GenericConfig {
type Target = Arc<dyn IConfig>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl GenericConfig {
pub fn new(value: Arc<dyn IConfig>) -> Self {
GenericConfig(value)
}
pub fn get_typed<T: for<'a> Deserialize<'a>>(&self, key: &str) -> ZResult<T> {
self.0
.get(key)
.and_then(|v| serde_json::from_str::<T>(&v).map_err(|e| e.into()))
}
pub fn get_plugin_config(&self, plugin_name: &str) -> ZResult<Value> {
self.get(&("plugins/".to_owned() + plugin_name))
.and_then(|v| serde_json::from_str(&v).map_err(|e| e.into()))
}
}
impl fmt::Display for GenericConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0.to_json())
}
}
#[cfg(test)]
mod tests {
use std::{env, fs::File, io::Write, str::FromStr, time::SystemTime};
use zenoh_protocol::core::{EndPoint, WhatAmI};
use crate::{Config, ModeDependentValue, ZenohId};
#[test]
fn test_toml_config_format() {
const FILE_CONTENTS: &str = r#"
id = "abc"
mode = "router"
[listen]
endpoints = ["tcp/localhost:7448"]
[adminspace]
enabled = true
"#;
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let path = env::temp_dir().join(format!("{timestamp}.test.config.toml"));
{
let mut tmp = File::create(&path).unwrap();
tmp.write_all(FILE_CONTENTS.as_bytes()).unwrap();
tmp.flush().unwrap();
}
let expected_config = {
let mut c = Config::default();
c.set_id(Some(ZenohId::from_str("abc").unwrap())).unwrap();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen
.set_endpoints(ModeDependentValue::Unique(vec![EndPoint::from_str(
"tcp/localhost:7448",
)
.unwrap()]))
.unwrap();
c.adminspace.set_enabled(true).unwrap();
c
};
assert_eq!(
Config::from_file(&path).unwrap().to_string(),
expected_config.to_string()
);
}
}