use clap::Parser;
use futures::future;
use git_version::git_version;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
#[cfg(feature = "loki")]
use url::Url;
use zenoh::{config::WhatAmI, Config, Result};
use zenoh_config::{EndPoint, ModeDependentValue, PermissionsConf};
use zenoh_util::LibSearchDirs;
#[cfg(feature = "loki")]
const LOKI_ENDPOINT_VAR: &str = "LOKI_ENDPOINT";
#[cfg(feature = "loki")]
const LOKI_API_KEY_VAR: &str = "LOKI_API_KEY";
#[cfg(feature = "loki")]
const LOKI_API_KEY_HEADER_VAR: &str = "LOKI_API_KEY_HEADER";
const GIT_VERSION: &str = git_version!(prefix = "v", cargo_prefix = "v");
lazy_static::lazy_static!(
static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION"));
);
#[derive(Debug, Parser)]
#[command(version=GIT_VERSION, long_version=LONG_VERSION.as_str(), about="The zenoh router")]
struct Args {
#[arg(short, long, value_name = "PATH")]
config: Option<String>,
#[arg(short, long, value_name = "ENDPOINT")]
listen: Vec<String>,
#[arg(short = 'e', long, value_name = "ENDPOINT")]
connect: Vec<String>,
#[arg(short, long)]
id: Option<String>,
#[arg(short = 'P', long)]
plugin: Vec<String>,
#[arg(long, value_name = "PATH")]
plugin_search_dir: Vec<String>,
#[arg(long)]
no_timestamp: bool,
#[arg(long)]
no_multicast_scouting: bool,
#[arg(long, value_name = "SOCKET")]
rest_http_port: Option<String>,
#[arg(long)]
cfg: Vec<String>,
#[arg(long, value_name = "[r|w|rw|none]")]
adminspace_permissions: Option<String>,
}
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
init_logging().unwrap();
tracing::info!("zenohd {}", *LONG_VERSION);
let args = Args::parse();
let config = config_from_args(&args);
tracing::info!("Initial conf: {}", &config);
let _session = match zenoh::open(config).await {
Ok(runtime) => runtime,
Err(e) => {
println!("{e}. Exiting...");
std::process::exit(-1);
}
};
future::pending::<()>().await;
});
}
fn config_from_args(args: &Args) -> Config {
let mut config = args
.config
.as_ref()
.map_or_else(Config::default, |conf_file| {
Config::from_file(conf_file).unwrap()
});
if config.mode().is_none() {
config.set_mode(Some(WhatAmI::Router)).unwrap();
}
if let Some(id) = &args.id {
config.set_id(id.parse().unwrap()).unwrap();
}
if args.rest_http_port.is_some() || args.config.is_none() {
let value = args.rest_http_port.as_deref().unwrap_or("8000");
if !value.eq_ignore_ascii_case("none") {
config
.insert_json5("plugins/rest/http_port", &format!(r#""{value}""#))
.unwrap();
config
.insert_json5("plugins/rest/__required__", "true")
.unwrap();
}
}
config.adminspace.set_enabled(true).unwrap();
config.plugins_loading.set_enabled(true).unwrap();
if !args.plugin_search_dir.is_empty() {
config
.plugins_loading
.set_search_dirs(LibSearchDirs::from_paths(&args.plugin_search_dir))
.unwrap();
}
for plugin in &args.plugin {
match plugin.split_once(':') {
Some((name, path)) => {
config
.insert_json5(&format!("plugins/{name}/__required__"), "true")
.unwrap();
config
.insert_json5(&format!("plugins/{name}/__path__"), &format!("\"{path}\""))
.unwrap();
}
None => config
.insert_json5(&format!("plugins/{plugin}/__required__"), "true")
.unwrap(),
}
}
if !args.connect.is_empty() {
config
.connect
.endpoints
.set(
args.connect
.iter()
.map(|v| match v.parse::<EndPoint>() {
Ok(v) => v,
Err(e) => {
panic!("Couldn't parse option --peer={} into Locator: {}", v, e);
}
})
.collect(),
)
.unwrap();
}
if !args.listen.is_empty() {
config
.listen
.endpoints
.set(
args.listen
.iter()
.map(|v| match v.parse::<EndPoint>() {
Ok(v) => v,
Err(e) => {
panic!("Couldn't parse option --listen={} into Locator: {}", v, e);
}
})
.collect(),
)
.unwrap();
}
if args.no_timestamp {
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(false)))
.unwrap();
};
match (
config.scouting.multicast.enabled().is_none(),
args.no_multicast_scouting,
) {
(_, true) => {
config.scouting.multicast.set_enabled(Some(false)).unwrap();
}
(true, false) => {
config.scouting.multicast.set_enabled(Some(true)).unwrap();
}
(false, false) => {}
};
if let Some(adminspace_permissions) = &args.adminspace_permissions {
match adminspace_permissions.as_str() {
"r" => config
.adminspace
.set_permissions(PermissionsConf {
read: true,
write: false,
})
.unwrap(),
"w" => config
.adminspace
.set_permissions(PermissionsConf {
read: false,
write: true,
})
.unwrap(),
"rw" => config
.adminspace
.set_permissions(PermissionsConf {
read: true,
write: true,
})
.unwrap(),
"none" => config
.adminspace
.set_permissions(PermissionsConf {
read: false,
write: false,
})
.unwrap(),
s => panic!(
r#"Invalid option: --adminspace-permissions={} - Accepted values: "r", "w", "rw" or "none""#,
s
),
};
}
for json in &args.cfg {
if let Some((key, value)) = json.split_once(':') {
match json5::Deserializer::from_str(value) {
Ok(mut deserializer) => {
if let Err(e) =
config.insert(key.strip_prefix('/').unwrap_or(key), &mut deserializer)
{
tracing::warn!("Couldn't perform configuration {}: {}", json, e);
}
}
Err(e) => tracing::warn!("Couldn't perform configuration {}: {}", json, e),
}
} else {
panic!(
"--cfg accepts KEY:VALUE pairs. {} is not a valid KEY:VALUE pair.",
json
)
}
}
tracing::debug!("Config: {:?}", &config);
config
}
fn init_logging() -> Result<()> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("z=info"));
let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_thread_ids(true)
.with_thread_names(true)
.with_level(true)
.with_target(true);
let tracing_sub = tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer);
#[cfg(feature = "loki")]
match (
get_loki_endpoint(),
get_loki_apikey(),
get_loki_apikey_header(),
) {
(Some(loki_url), Some(header), Some(apikey)) => {
let (loki_layer, task) = tracing_loki::builder()
.label("service", "zenoh")?
.http_header(header, apikey)?
.build_url(Url::parse(&loki_url)?)?;
tracing_sub.with(loki_layer).init();
tokio::spawn(task);
return Ok(());
}
_ => {
tracing::warn!("Missing one of the required header for Loki!")
}
};
tracing_sub.init();
Ok(())
}
#[cfg(feature = "loki")]
pub fn get_loki_endpoint() -> Option<String> {
std::env::var(LOKI_ENDPOINT_VAR).ok()
}
#[cfg(feature = "loki")]
pub fn get_loki_apikey() -> Option<String> {
std::env::var(LOKI_API_KEY_VAR).ok()
}
#[cfg(feature = "loki")]
pub fn get_loki_apikey_header() -> Option<String> {
std::env::var(LOKI_API_KEY_HEADER_VAR).ok()
}
#[test]
#[cfg(feature = "default")]
fn test_default_features() {
assert_eq!(
zenoh::FEATURES,
concat!(
" zenoh/auth_pubkey",
" zenoh/auth_usrpwd",
" zenoh/transport_multilink",
" zenoh/transport_quic",
" zenoh/transport_tcp",
" zenoh/transport_tls",
" zenoh/transport_udp",
" zenoh/transport_unixsock-stream",
" zenoh/transport_ws",
" zenoh/unstable",
" zenoh/default",
)
);
}
#[test]
#[cfg(not(feature = "default"))]
fn test_no_default_features() {
assert_eq!(
zenoh::FEATURES,
concat!(
" zenoh/unstable",
)
);
}