use std::{
env, fmt,
path::Path,
sync::{Arc, Mutex, MutexGuard},
};
use serde::{Deserialize, Serialize};
use zenoh_config::ExpandedConfig;
use zenoh_result::{bail, ZResult};
#[doc = concat!(
"```json5\n",
include_str!("../../DEFAULT_CONFIG.json5"),
"\n```"
)]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct Config(pub(crate) zenoh_config::Config);
impl Config {
pub const DEFAULT_CONFIG_PATH_ENV: &'static str = "ZENOH_CONFIG";
pub fn from_env() -> ZResult<Self> {
let path = env::var(Self::DEFAULT_CONFIG_PATH_ENV)?;
Ok(Config(zenoh_config::Config::from_file(Path::new(&path))?))
}
pub fn from_file<P: AsRef<Path>>(path: P) -> ZResult<Self> {
Ok(Config(zenoh_config::Config::from_file(path)?))
}
pub fn from_json5(input: &str) -> ZResult<Config> {
match zenoh_config::Config::from_deserializer(&mut json5::Deserializer::from_str(input)?) {
Ok(config) => Ok(Config(config)),
Err(Ok(_)) => {
Err(zerror!("The config was correctly deserialized, but it is invalid").into())
}
Err(Err(err)) => Err(err.into()),
}
}
pub fn remove<K: AsRef<str>>(&mut self, key: K) -> ZResult<()> {
match key.as_ref().split_once("=") {
None => self.0.remove(key.as_ref()),
Some((prefix, id_value)) => {
let (key, id_key) = prefix.rsplit_once("/").ok_or("missing id")?;
let current = serde_json::from_str::<serde_json::Value>(&self.get_json(key)?)?;
let serde_json::Value::Array(mut list) = current else {
bail!("not an array")
};
let prev_len = list.len();
list.retain(|item| match item {
serde_json::Value::Object(map) => {
map.get(id_key).and_then(|v| v.as_str()) != Some(id_value)
}
_ => true,
});
if list.len() != prev_len {
self.0.insert_json5(key, &serde_json::to_string(&list)?)?;
}
Ok(())
}
}
}
pub fn insert_json5(&mut self, key: &str, value: &str) -> ZResult<()> {
match key.split_once("=") {
None => self.0.insert_json5(key, value),
Some((prefix, id_value)) => {
let (key, id_key) = prefix.rsplit_once("/").ok_or("missing id")?;
let new_item = json5::from_str::<serde_json::Value>(value)?;
if new_item
.as_object()
.and_then(|map| map.get(id_key))
.and_then(|v| v.as_str())
!= Some(id_value)
{
bail!("id mismatch");
}
let current = serde_json::from_str::<serde_json::Value>(&self.get_json(key)?)?;
let serde_json::Value::Array(mut list) = current else {
bail!("not an array")
};
let mut new_item = Some(new_item);
for item in list.iter_mut() {
let serde_json::Value::Object(map) = item else {
bail!("array item is not an object");
};
if map.get(id_key).and_then(|v| v.as_str()) == Some(id_value) {
*item = new_item.take().unwrap();
break;
}
}
if let Some(new_item) = new_item {
list.push(new_item);
}
self.0.insert_json5(key, &serde_json::to_string(&list)?)
}
}
.map_err(|err| zerror!("{err}").into())
}
pub fn get_json(&self, key: &str) -> ZResult<String> {
self.0.get_json(key).map_err(|err| zerror!("{err}").into())
}
#[zenoh_macros::unstable]
#[allow(clippy::result_large_err)]
pub fn from_deserializer<'d, D: serde::Deserializer<'d>>(
d: D,
) -> Result<Self, Result<Self, D::Error>>
where
Self: serde::Deserialize<'d>,
{
match zenoh_config::Config::from_deserializer(d) {
Ok(config) => Ok(Config(config)),
Err(result) => match result {
Ok(config) => Err(Ok(Config(config))),
Err(err) => Err(Err(err)),
},
}
}
}
#[zenoh_macros::unstable]
impl std::ops::Deref for Config {
type Target = zenoh_config::Config;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[zenoh_macros::unstable]
impl std::ops::DerefMut for Config {
fn deref_mut(&mut self) -> &mut <Self as std::ops::Deref>::Target {
&mut self.0
}
}
#[doc(hidden)]
impl From<zenoh_config::Config> for Config {
fn from(value: zenoh_config::Config) -> Self {
Self(value)
}
}
impl fmt::Display for Config {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.0)
}
}
pub type Notification = Arc<str>;
struct NotifierInner<T> {
inner: Mutex<T>,
subscribers: Mutex<Vec<flume::Sender<Notification>>>,
}
pub struct Notifier<T> {
inner: Arc<NotifierInner<T>>,
}
impl<T> Clone for Notifier<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl Notifier<ExpandedConfig> {
pub fn new(inner: ExpandedConfig) -> Self {
Notifier {
inner: Arc::new(NotifierInner {
inner: Mutex::new(inner),
subscribers: Mutex::new(Vec::new()),
}),
}
}
#[cfg(feature = "plugins")]
pub fn subscribe(&self) -> flume::Receiver<Notification> {
let (tx, rx) = flume::unbounded();
self.lock_subscribers().push(tx);
rx
}
pub fn notify<K: AsRef<str>>(&self, key: K) {
let key = key.as_ref();
let key: Arc<str> = Arc::from(key);
let mut marked = Vec::new();
let mut subscribers = self.lock_subscribers();
for (i, sub) in subscribers.iter().enumerate() {
if sub.send(key.clone()).is_err() {
marked.push(i)
}
}
for i in marked.into_iter().rev() {
subscribers.swap_remove(i);
}
}
pub fn lock(&self) -> MutexGuard<'_, ExpandedConfig> {
self.lock_config()
}
fn lock_subscribers(&self) -> MutexGuard<'_, Vec<flume::Sender<Notification>>> {
self.inner
.subscribers
.lock()
.expect("acquiring Notifier's subscribers Mutex should not fail")
}
fn lock_config(&self) -> MutexGuard<'_, ExpandedConfig> {
self.inner
.inner
.lock()
.expect("acquiring Notifier's Config Mutex should not fail")
}
pub fn remove<K: AsRef<str>>(&self, key: K) -> ZResult<()> {
self.lock_config().remove(key.as_ref())?;
self.notify(key);
Ok(())
}
pub fn insert_json5(&self, key: &str, value: &str) -> ZResult<()> {
if !key.starts_with("plugins/") {
bail!(
"Error inserting conf value {} : updating config is only \
supported for keys starting with `plugins/`",
key
);
}
self.lock_config().insert_json5(key, value)?;
self.notify(key);
Ok(())
}
}
#[cfg(test)]
mod tests {
use zenoh_config::{InterceptorFlow, QosOverwriteItemConf};
use crate::Config;
#[test]
fn insert_remove_list_item() {
let mut config = Config::default();
let item1 = r#"{
id: "item1",
messages: ["put"],
key_exprs: ["**"],
overwrite: {
priority: "real_time",
},
flows: ["egress"]
}"#;
config.insert_json5("qos/network/id=item1", item1).unwrap();
let items = serde_json::from_str::<Vec<QosOverwriteItemConf>>(
&config.get_json("qos/network").unwrap(),
)
.unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0].id.as_ref().unwrap(), "item1");
assert_eq!(
*items[0].flows.as_ref().unwrap().first(),
InterceptorFlow::Egress
);
let item1 = r#"{
id: "item1",
messages: ["put"],
key_exprs: ["**"],
overwrite: {
priority: "real_time",
},
flows: ["ingress"]
}"#;
config.insert_json5("qos/network/id=item1", item1).unwrap();
let items = serde_json::from_str::<Vec<QosOverwriteItemConf>>(
&config.get_json("qos/network").unwrap(),
)
.unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0].id.as_ref().unwrap(), "item1");
assert_eq!(
*items[0].flows.as_ref().unwrap().first(),
InterceptorFlow::Ingress
);
let item2 = r#"{
id: "item2",
messages: ["put"],
key_exprs: ["**"],
overwrite: {
priority: "real_time",
},
flows: ["egress"]
}"#;
config.insert_json5("qos/network/id=item2", item2).unwrap();
let items = serde_json::from_str::<Vec<QosOverwriteItemConf>>(
&config.get_json("qos/network").unwrap(),
)
.unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].id.as_ref().unwrap(), "item1");
assert_eq!(
*items[0].flows.as_ref().unwrap().first(),
InterceptorFlow::Ingress
);
assert_eq!(items[1].id.as_ref().unwrap(), "item2");
assert_eq!(
*items[1].flows.as_ref().unwrap().first(),
InterceptorFlow::Egress
);
config.remove("qos/network/id=item2").unwrap();
let items = serde_json::from_str::<Vec<QosOverwriteItemConf>>(
&config.get_json("qos/network").unwrap(),
)
.unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0].id.as_ref().unwrap(), "item1");
assert_eq!(
*items[0].flows.as_ref().unwrap().first(),
InterceptorFlow::Ingress
);
config.remove("qos/network/id=item1").unwrap();
let items = serde_json::from_str::<Vec<QosOverwriteItemConf>>(
&config.get_json("qos/network").unwrap(),
)
.unwrap();
assert_eq!(items.len(), 0);
}
}