use std::{
collections::HashMap,
convert::TryInto,
io::Write,
mem,
sync::{Arc, Mutex},
};
use itertools::Itertools;
use serde_json::json;
use tracing::{error, trace};
use zenoh_buffers::buffer::SplitBuffer;
use zenoh_config::{wrappers::ZenohId, ConfigValidator};
use zenoh_core::Wait;
use zenoh_keyexpr::keyexpr;
use zenoh_link::Link;
#[cfg(all(feature = "plugins", feature = "runtime_plugins"))]
use zenoh_plugin_trait::PluginDiff;
#[cfg(feature = "plugins")]
use zenoh_plugin_trait::{PluginControl, PluginStatus};
use zenoh_protocol::{
core::{
key_expr::OwnedKeyExpr, ExprId, Region, Reliability, WireExpr, ZenohIdProto, EMPTY_EXPR_ID,
},
network::{
declare::{queryable::ext::QueryableInfoType, QueryableId},
ext, Declare, DeclareBody, DeclareQueryable, DeclareSubscriber, Interest, Push, Request,
Response, ResponseFinal,
},
zenoh::{PushBody, RequestBody},
};
use zenoh_result::ZResult;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast, TransportPeer};
use super::{routing::dispatcher::face::Face, Runtime};
#[cfg(all(feature = "plugins", feature = "runtime_plugins"))]
use crate::api::plugins::PluginsManager;
#[cfg(all(feature = "plugins", feature = "runtime_plugins"))]
use crate::internal::runtime::DynamicRuntime;
use crate::{
api::{
bytes::ZBytes,
key_expr::KeyExpr,
queryable::{Query, QueryInner, ReplyPrimitives},
},
bytes::Encoding,
net::{
primitives::Primitives,
routing::{dispatcher::tables::Tables, gateway::Resource, hat::Sources},
runtime::region,
},
LONG_VERSION,
};
pub const METRICS_ENCODING: &str = "application/openmetrics-text; version=1.0.0; charset=utf-8";
pub struct AdminContext {
runtime: Runtime,
}
type Handler = Arc<dyn Fn(&keyexpr, &AdminContext, Query) + Send + Sync>;
pub struct AdminSpace {
zid: ZenohId,
queryable_id: QueryableId,
primitives: Mutex<Option<Arc<Face>>>,
mappings: Mutex<HashMap<ExprId, String>>,
handlers: HashMap<OwnedKeyExpr, (Handler, OwnedKeyExpr)>,
context: Arc<AdminContext>,
}
impl ConfigValidator for AdminSpace {
fn check_config(
&self,
name: &str,
path: &str,
current: &serde_json::Map<String, serde_json::Value>,
new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
#[cfg(feature = "plugins")]
{
let plugins_mgr = self.context.runtime.plugins_manager();
let Some(plugin) = plugins_mgr.started_plugin(name) else {
tracing::warn!("Plugin `{}` is not started", name);
return Ok(None);
};
plugin
.instance()
.config_checker(path, ¤t.into(), &new.into())
.map(|m| m.map(|kv| kv.into()))
}
#[cfg(not(feature = "plugins"))]
{
let _ = (name, path, current, new);
Ok(None)
}
}
}
impl AdminSpace {
#[cfg(all(feature = "plugins", feature = "runtime_plugins"))]
fn start_plugin(
plugin_mgr: &mut PluginsManager,
config: &zenoh_config::PluginLoad,
start_args: &DynamicRuntime,
required: bool,
) -> ZResult<()> {
let id = &config.id;
let name = &config.name;
let declared = if let Some(declared) = plugin_mgr.plugin_mut(id) {
tracing::warn!("Plugin `{}` was already declared", declared.id());
declared
} else if let Some(paths) = &config.paths {
plugin_mgr.declare_dynamic_plugin_by_paths(id, name, paths, required)?
} else {
plugin_mgr.declare_dynamic_plugin_by_name(id, name, required)?
};
let loaded = if let Some(loaded) = declared.loaded_mut() {
tracing::warn!(
"Plugin `{}` was already loaded from {}",
loaded.id(),
loaded.path()
);
loaded
} else {
match declared.load()? {
Some(loaded) => loaded,
None => {
tracing::warn!(
"Plugin `{}` will not be loaded as plugin loading is disabled",
config.name
);
return Ok(());
}
}
};
if let Some(started) = loaded.started_mut() {
tracing::warn!("Plugin `{}` was already started", started.id());
} else {
let started = loaded.start(start_args)?;
tracing::info!(
"Successfully started plugin `{}` from {}",
started.id(),
started.path()
);
};
Ok(())
}
pub async fn start(runtime: &Runtime) {
let zid_str = runtime.state.zid.to_string();
let whatami_str = runtime.state.whatami.to_str();
let config = &mut runtime.config().lock();
let root_key: OwnedKeyExpr = format!("@/{zid_str}/{whatami_str}").try_into().unwrap();
let mut handlers: HashMap<OwnedKeyExpr, (Handler, OwnedKeyExpr)> = HashMap::new();
macro_rules! add_handler {
($key:expr, $glob:expr, $handler:expr) => {{
let key_expr = keyexpr::new($key).unwrap();
let glob_expr = keyexpr::new($glob).unwrap();
let prefix = &root_key / key_expr;
let full_key = &prefix / glob_expr;
handlers.insert(full_key, (Arc::new($handler), prefix));
}};
($key:expr, $handler:expr) => {{
let key_expr = keyexpr::new($key).unwrap();
let full_key = &root_key / key_expr;
handlers.insert(full_key.clone(), (Arc::new($handler), full_key));
}};
($handler:expr) => {{
handlers.insert(root_key.clone(), (Arc::new($handler), root_key.clone()));
}};
}
add_handler!(local_data);
add_handler!("metrics", metrics);
add_handler!("linkstate", "*", linkstate_data);
add_handler!("subscriber", "**", subscribers_data);
add_handler!("publisher", "**", publishers_data);
add_handler!("queryable", "**", queryables_data);
add_handler!("querier", "**", queriers_data);
add_handler!("token", "**", tokens_data);
add_handler!("route/successor", "**", route_successor);
#[cfg(feature = "plugins")]
add_handler!("plugins", "**", plugins_data);
#[cfg(feature = "plugins")]
add_handler!("status/plugins", "**", plugins_status);
#[cfg(all(feature = "plugins", feature = "runtime_plugins"))]
let mut active_plugins = runtime
.plugins_manager()
.started_plugins_iter()
.map(|rec| (rec.id().to_string(), rec.path().to_string()))
.collect::<HashMap<_, _>>();
let context = Arc::new(AdminContext {
runtime: runtime.clone(),
});
let admin = Arc::new(AdminSpace {
zid: runtime.zid(),
queryable_id: runtime.next_id(),
primitives: Mutex::new(None),
mappings: Mutex::new(HashMap::new()),
handlers,
context,
});
config.set_plugin_validator(Arc::downgrade(&admin));
#[cfg(all(feature = "plugins", feature = "runtime_plugins"))]
{
let cfg_rx = admin.context.runtime.state.config.subscribe();
zenoh_runtime::ZRuntime::Net.spawn({
let admin = admin.clone();
async move {
while let Ok(change) = cfg_rx.recv_async().await {
let change = change.strip_prefix('/').unwrap_or(&change);
if !change.starts_with("plugins") {
continue;
}
let requested_plugins = {
let cfg_guard = admin.context.runtime.state.config.lock();
cfg_guard.plugins().load_requests().collect::<Vec<_>>()
};
let mut diffs = Vec::new();
for plugin in active_plugins.keys() {
if !requested_plugins.iter().any(|r| &r.id == plugin) {
diffs.push(PluginDiff::Delete(plugin.clone()))
}
}
for request in requested_plugins {
if let Some(active) = active_plugins.get(&request.id) {
if request
.paths
.as_ref()
.map(|p| p.contains(active))
.unwrap_or(true)
{
continue;
}
diffs.push(PluginDiff::Delete(request.id.clone()))
}
diffs.push(PluginDiff::Start(request))
}
let mut plugins_mgr = admin.context.runtime.plugins_manager();
for diff in diffs {
match diff {
PluginDiff::Delete(id) => {
active_plugins.remove(id.as_str());
if let Some(running) = plugins_mgr.started_plugin_mut(&id) {
running.stop()
}
}
PluginDiff::Start(plugin) => {
let dynamic_runtime = admin.context.runtime.clone().into();
if let Err(e) = Self::start_plugin(
&mut plugins_mgr,
&plugin,
&dynamic_runtime,
plugin.required,
) {
if plugin.required {
panic!("Failed to load plugin `{}`: {}", plugin.id, e)
} else {
tracing::error!(
"Failed to load plugin `{}`: {}",
plugin.id,
e
)
}
}
}
}
}
}
tracing::info!("Running plugins: {:?}", &active_plugins)
}
});
}
let _span =
tracing::debug_span!("adminspace", zid = %ZenohIdProto::from(admin.zid).short())
.entered();
let primitives = runtime.state.router.new_session(admin.clone());
zlock!(admin.primitives).replace(primitives.clone());
primitives.send_declare(&mut Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareQueryable(DeclareQueryable {
id: runtime.next_id(),
wire_expr: [&root_key, "/**"].concat().into(),
ext_info: QueryableInfoType::DEFAULT,
}),
});
primitives.send_declare(&mut Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
id: runtime.next_id(),
wire_expr: [&root_key, "/config/**"].concat().into(),
}),
});
}
pub fn key_expr_to_string<'a>(&self, key_expr: &'a WireExpr) -> ZResult<KeyExpr<'a>> {
if key_expr.scope == EMPTY_EXPR_ID {
key_expr.suffix.as_ref().try_into()
} else if key_expr.suffix.is_empty() {
match zlock!(self.mappings).get(&key_expr.scope) {
Some(prefix) => prefix.clone().try_into(),
None => bail!("Failed to resolve ExprId {}", key_expr.scope),
}
} else {
match zlock!(self.mappings).get(&key_expr.scope) {
Some(prefix) => format!("{}{}", prefix, key_expr.suffix.as_ref()).try_into(),
None => bail!("Failed to resolve ExprId {}", key_expr.scope),
}
}
}
}
impl Primitives for AdminSpace {
fn send_interest(&self, msg: &mut Interest) {
tracing::trace!("Recv interest {:?}", msg);
}
fn send_declare(&self, msg: &mut Declare) {
tracing::trace!("Recv declare {:?}", msg);
if let DeclareBody::DeclareKeyExpr(m) = &msg.body {
match self.key_expr_to_string(&m.wire_expr) {
Ok(s) => {
zlock!(self.mappings).insert(m.id, s.into());
}
Err(e) => error!("Unknown expr_id {}! ({})", m.id, e),
}
}
}
fn send_push_consume(&self, msg: &mut Push, _reliability: Reliability, _consume: bool) {
trace!("recv Push {:?}", msg);
{
let conf = &self.context.runtime.state.config.lock();
if !conf.adminspace.permissions().write {
tracing::error!(
"Received PUT on '{}' but adminspace.permissions.write=false in configuration",
msg.wire_expr
);
return;
}
}
if let Some(key) = msg.wire_expr.as_str().strip_prefix(&format!(
"@/{}/{}/config/",
self.context.runtime.state.zid, self.context.runtime.state.whatami,
)) {
match &msg.payload {
PushBody::Put(put) => match std::str::from_utf8(&put.payload.contiguous()) {
Ok(json) => {
tracing::trace!(
"Insert conf value @/{}/{}/config/{} : {}",
self.context.runtime.state.zid,
self.context.runtime.state.whatami,
key,
json
);
if let Err(e) = self.context.runtime.state.config.insert_json5(key, json) {
error!(
"Error inserting conf value @/{}/{}/config/{} : {} - {}",
self.context.runtime.state.zid,
self.context.runtime.state.whatami,
key,
json,
e
);
}
}
Err(e) => error!(
"Received non utf8 conf value on @/{}/{}/config/{} : {}",
self.context.runtime.state.zid, self.context.runtime.state.whatami, key, e
),
},
PushBody::Del(_) => {
tracing::trace!(
"Deleting conf value /@/{}/{}/config/{}",
self.context.runtime.state.zid,
self.context.runtime.state.whatami,
key
);
if let Err(e) = self.context.runtime.state.config.remove(key) {
tracing::error!("Error deleting conf value {} : {}", msg.wire_expr, e)
}
}
}
}
}
fn send_request(&self, msg: &mut Request) {
trace!("recv Request {:?}", msg);
match &mut msg.payload {
RequestBody::Query(query) => {
let _span =
tracing::debug_span!("adminspace", zid = %ZenohIdProto::from(self.zid).short())
.entered();
let primitives = zlock!(self.primitives).as_ref().unwrap().clone();
{
let conf = &self.context.runtime.state.config.lock();
if !conf.adminspace.permissions().read {
tracing::error!(
"Received GET on '{}' but adminspace.permissions.read=false in configuration",
msg.wire_expr
);
primitives.send_response_final(&mut ResponseFinal {
rid: msg.id,
ext_qos: msg.ext_qos,
ext_tstamp: None,
});
return;
}
}
let key_expr = match self.key_expr_to_string(&msg.wire_expr) {
Ok(key_expr) => key_expr.into_owned(),
Err(e) => {
tracing::error!("Unknown KeyExpr: {}", e);
primitives.send_response_final(&mut ResponseFinal {
rid: msg.id,
ext_qos: msg.ext_qos,
ext_tstamp: None,
});
return;
}
};
let zid = self.zid;
let query = Query {
inner: Arc::new(QueryInner {
key_expr: key_expr.clone(),
parameters: mem::take(&mut query.parameters).into(),
qid: msg.id,
zid: zid.into(),
qos: msg.ext_qos.into(),
#[cfg(feature = "unstable")]
source_info: query.ext_sinfo.map(Into::into),
primitives: ReplyPrimitives::new_remote(None, primitives),
}),
eid: self.queryable_id,
value: mem::take(&mut query.ext_body)
.map(|b| (b.payload.into(), b.encoding.into())),
attachment: query.ext_attachment.take().map(Into::into),
};
for (full_key, (handler, prefix)) in &self.handlers {
if key_expr.intersects(full_key) {
handler(prefix, &self.context, query.clone());
}
}
}
}
}
fn send_response(&self, msg: &mut Response) {
trace!("recv Response {:?}", msg);
}
fn send_response_final(&self, msg: &mut ResponseFinal) {
trace!("recv ResponseFinal {:?}", msg);
}
fn send_close(&self) {
trace!("recv Close");
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl crate::net::primitives::EPrimitives for AdminSpace {
#[inline]
fn send_interest(&self, ctx: crate::net::routing::RoutingContext<&mut Interest>) -> bool {
(self as &dyn Primitives).send_interest(ctx.msg);
false
}
#[inline]
fn send_declare(&self, ctx: crate::net::routing::RoutingContext<&mut Declare>) -> bool {
(self as &dyn Primitives).send_declare(ctx.msg);
false
}
#[inline]
fn send_push(&self, msg: &mut Push, reliability: Reliability) -> bool {
(self as &dyn Primitives).send_push(msg, reliability);
false
}
#[inline]
fn send_request(&self, msg: &mut Request) -> bool {
(self as &dyn Primitives).send_request(msg);
false
}
#[inline]
fn send_response(&self, msg: &mut Response) -> bool {
(self as &dyn Primitives).send_response(msg);
false
}
#[inline]
fn send_response_final(&self, msg: &mut ResponseFinal) -> bool {
(self as &dyn Primitives).send_response_final(msg);
false
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[tracing::instrument(level = "trace", skip_all)]
fn local_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
let transport_mgr = context.runtime.manager().clone();
#[cfg(feature = "plugins")]
let plugins: serde_json::Value = {
let plugins_mgr = context.runtime.plugins_manager();
plugins_mgr
.started_plugins_iter()
.map(|rec| (rec.id(), json!({"name":rec.name(), "path": rec.path() })))
.collect()
};
#[cfg(not(all(feature = "unstable", feature = "plugins")))]
let plugins = serde_json::Value::Null;
let locators: Vec<serde_json::Value> = transport_mgr
.get_locators()
.iter()
.map(|locator| json!(locator.as_str()))
.collect();
let links_info = context.runtime.get_links_info();
let config = context.runtime.config().lock().clone();
let transport_unicast_to_region = move |transport: &TransportUnicast| -> Option<Region> {
let peer = transport.get_peer().ok()?;
let transient_remote_bound = transport.get_bound().ok()?;
let (region, _) =
region::compute_region_of(&peer, &config, transient_remote_bound.as_ref()).ok()?;
Some(region)
};
let transport_unicast_to_json = move |transport: &TransportUnicast| {
let link_to_json = |link: &Link| {
json!({
"src": link.src.to_string(),
"dst": link.dst.to_string()
})
};
let links = transport
.get_links()
.unwrap_or_default()
.iter()
.map(link_to_json)
.collect_vec();
#[cfg(feature = "shared-memory")]
let shm = transport.is_shm().unwrap_or_default();
#[cfg(not(feature = "shared-memory"))]
let shm = false;
let json = json!({
"peer": transport.get_zid().map_or_else(|_| "unknown".to_string(), |p| p.to_string()),
"whatami": transport.get_whatami().map_or_else(|_| "unknown".to_string(), |p| p.to_string()),
"links": links,
"weight": transport.get_zid().ok().and_then(|zid| links_info.get(&zid)),
"shm": shm,
"region": transport_unicast_to_region(transport).map_or_else(|| "unknown".to_string(), |r| r.to_string())
});
json
};
let transport_multicast_peer_to_json =
|transport: &TransportMulticast, mcast_peer: &TransportPeer| {
let link_to_json = |link: &Link| {
json!({
"src": link.src.to_string(),
"dst": link.dst.to_string()
})
};
let links = mcast_peer.links.iter().map(link_to_json).collect_vec();
let json = json!({
"peer": mcast_peer.zid.to_string(),
"whatami": mcast_peer.whatami.to_string(),
"group": transport
.get_link()
.ok()
.and_then(|t| t.group.map(|g| g.to_string()))
.unwrap_or("unknown".to_string()),
"links": links,
});
json
};
let mut transports: Vec<serde_json::Value> = vec![];
zenoh_runtime::ZRuntime::Net.block_in_place(async {
for transport in transport_mgr.get_transports_unicast().await {
transports.push(transport_unicast_to_json(&transport));
}
for mcast_transport in transport_mgr.get_transports_multicast().await {
if let Ok(peers) = mcast_transport.get_peers() {
for mcast_peer in &peers {
transports.push(transport_multicast_peer_to_json(
&mcast_transport,
mcast_peer,
));
}
}
}
});
#[cfg_attr(not(feature = "stats"), allow(unused_mut))]
let mut json = json!({
"zid": context.runtime.state.zid,
"version": &*LONG_VERSION,
"metadata": context.runtime.config().lock().metadata(),
"locators": locators,
"sessions": transports,
"plugins": plugins,
});
#[cfg(feature = "stats")]
if query
.parameters()
.iter()
.any(|(k, v)| k == "_stats" && v != "false")
{
context.runtime.stats().merge_stats(&mut json);
}
tracing::trace!("AdminSpace router_data: {:?}", json);
let payload = match serde_json::to_vec(&json) {
Ok(bytes) => ZBytes::from(bytes),
Err(e) => {
tracing::error!("Error serializing AdminSpace reply: {:?}", e);
return;
}
};
if let Err(e) = query
.reply(prefix, payload)
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
#[tracing::instrument(level = "trace", skip_all)]
fn metrics(prefix: &keyexpr, context: &AdminContext, query: Query) {
#[cfg(not(feature = "stats"))]
let mut metrics = format!(
concat!(
"# HELP zenoh_build Zenoh build version.\n",
"# TYPE zenoh_build info\n",
"zenoh_build_info{{local_id=\"{zid}\",local_whatami=\"{whatami}\",version=\"{version}\"}} 1\n",
"# EOF\n ",
),
zid = context.runtime.state.zid,
whatami = context.runtime.state.whatami,
version = &*LONG_VERSION,
);
#[cfg(feature = "stats")]
let mut metrics = String::new();
#[cfg(feature = "stats")]
context
.runtime
.stats()
.encode_metrics(
&mut metrics,
query.parameters().get("per_transport") != Some("false"),
query.parameters().get("per_link") != Some("false"),
query.parameters().get("disconnected") == Some("true"),
query.parameters().get("per_key") != Some("false"),
)
.expect("metrics should be encodable");
if query.parameters().get("descriptors") == Some("false") {
metrics = metrics
.split_inclusive("\n")
.filter(|l| !l.starts_with('#'))
.chain(["# EOF\n"])
.collect();
}
let mut metrics = metrics.into_bytes();
let mut encoding = METRICS_ENCODING.to_string();
if query.parameters().get("compression") != Some("false") {
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
encoder.write_all(&metrics).unwrap();
metrics = encoder.finish().unwrap();
encoding.push_str(";content-encoding=gzip");
}
if let Err(e) = query.reply(prefix, metrics).encoding(encoding).wait() {
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
fn resources_data<F>(prefix: &keyexpr, context: &AdminContext, query: Query, f: F)
where
F: Fn(&Tables) -> HashMap<Arc<Resource>, Sources>,
{
let tables = &context.runtime.state.router.tables;
let rtables = zread!(tables.tables);
for res in f(&rtables) {
let key = prefix / keyexpr::new(res.0.expr()).unwrap();
if query.key_expr().intersects(&key) {
let payload =
ZBytes::from(serde_json::to_string(&res.1).unwrap_or_else(|_| "{}".to_string()));
if let Err(e) = query
.reply(key, payload)
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
}
}
#[tracing::instrument(level = "trace", skip_all)]
fn subscribers_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
resources_data(prefix, context, query, |tables| {
tables.sourced_subscribers()
});
}
#[tracing::instrument(level = "trace", skip_all)]
fn publishers_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
resources_data(prefix, context, query, |tables| tables.sourced_publishers());
}
#[tracing::instrument(level = "trace", skip_all)]
fn queryables_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
resources_data(prefix, context, query, |tables| tables.sourced_queryables());
}
#[tracing::instrument(level = "trace", skip_all)]
fn queriers_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
resources_data(prefix, context, query, |tables| tables.sourced_queriers());
}
#[tracing::instrument(level = "trace", skip_all)]
fn tokens_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
resources_data(prefix, context, query, |tables| tables.sourced_tokens());
}
#[tracing::instrument(level = "trace", skip_all)]
fn linkstate_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
let tables = &context.runtime.state.router.tables;
let rtables = zread!(tables.tables);
for (region, hat) in rtables
.hats
.iter()
.filter(|(_, hat)| hat.mode().is_peer() || hat.mode().is_router())
{
let reply_key = prefix / &KeyExpr::try_from(format!("{region}")).unwrap();
if query.key_expr().intersects(&reply_key) {
if let Err(e) = query
.reply(reply_key, hat.info())
.encoding(Encoding::TEXT_PLAIN)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
}
}
#[tracing::instrument(level = "trace", skip_all)]
fn route_successor(prefix: &keyexpr, context: &AdminContext, query: Query) {
let reply = |ke: &keyexpr, successor: ZenohIdProto| {
if let Err(e) = query
.reply(ke, serde_json::to_vec(&json!(successor)).unwrap())
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
};
let tables = &context.runtime.state.router.tables;
let rtables = zread!(tables.tables);
let suffix = query.key_expr().as_str().strip_prefix(prefix.as_str());
if let Some((src, dst)) = suffix.and_then(|s| s.strip_prefix("/src/")?.split_once("/dst/")) {
if let (Ok(src_zid), Ok(dst_zid)) = (src.parse(), dst.parse()) {
for hat in rtables.hats.values().filter(|hat| hat.mode().is_router()) {
if let Some(successor) = hat.route_successor(src_zid, dst_zid) {
reply(query.key_expr(), successor);
}
}
}
}
let successors = rtables
.hats
.values()
.filter(|hat| hat.mode().is_router())
.flat_map(|hat| hat.route_successors())
.collect_vec();
drop(rtables);
for entry in successors {
let ke = KeyExpr::new(format!(
"{prefix}/src/{src}/dst/{dst}",
src = entry.source,
dst = entry.destination
))
.unwrap();
if query.key_expr().intersects(&ke) {
reply(&ke, entry.successor);
}
}
}
#[cfg(feature = "plugins")]
#[tracing::instrument(level = "trace", skip_all)]
fn plugins_data(prefix: &keyexpr, context: &AdminContext, query: Query) {
let guard = context.runtime.plugins_manager();
tracing::debug!("requested plugins status {:?}", query.key_expr());
if let [names, ..] = query.key_expr().strip_prefix(prefix)[..] {
let statuses = guard.plugins_status(names);
for status in statuses {
tracing::debug!("plugin status: {:?}", status);
let key = prefix / keyexpr::new(status.id()).unwrap();
match serde_json::to_vec(&status) {
Ok(bytes) => {
if let Err(e) = query
.reply(key, bytes)
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Err(e) => tracing::debug!("Admin query error: {}", e),
}
}
}
}
#[cfg(feature = "plugins")]
fn plugins_status(prefix: &keyexpr, context: &AdminContext, query: Query) {
let key_expr = query.key_expr();
let guard = context.runtime.plugins_manager();
let mut root_key = prefix.as_str().to_string();
for plugin in guard.started_plugins_iter() {
with_extended_string(&mut root_key, &["/", plugin.id()], |plugin_key| {
with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| {
if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) {
if query.key_expr().intersects(&key_expr) {
if let Err(e) = query
.reply(key_expr, plugin.path())
.encoding(Encoding::TEXT_PLAIN)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
} else {
tracing::error!("Error: invalid plugin path key {}", plugin_path_key);
}
});
let matches_plugin = |plugin_status_space: &mut String| {
query
.key_expr()
.intersects(plugin_status_space.as_str().try_into().unwrap())
};
if !with_extended_string(plugin_key, &["/**"], matches_plugin) {
return;
}
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
plugin.instance().adminspace_getter(key_expr, plugin_key)
})) {
Ok(Ok(responses)) => {
for response in responses {
if let Ok(key_expr) = KeyExpr::try_from(response.key) {
match serde_json::to_vec::<serde_json::Value>(&response.value.into()) {
Ok(bytes) => {
if let Err(e) = query.reply(key_expr, bytes).encoding(Encoding::APPLICATION_JSON).wait() {
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Err(e) => tracing::debug!("Admin query error: {}", e),
}
} else {
tracing::error!("Error: plugin {} replied with an invalid key", plugin_key);
}
}
}
Ok(Err(e)) => {
tracing::error!("Plugin {} bailed from responding to {}: {}", plugin.id(), query.key_expr(), e)
}
Err(e) => match e
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| e.downcast_ref::<&str>().copied())
{
Some(e) => tracing::error!("Plugin {} panicked while responding to {}: {}", plugin.id(), query.key_expr(), e),
None => tracing::error!("Plugin {} panicked while responding to {}. The panic message couldn't be recovered.", plugin.id(), query.key_expr()),
},
}
});
}
}
#[cfg(feature = "plugins")]
fn with_extended_string<R, F: FnMut(&mut String) -> R>(
prefix: &mut String,
suffixes: &[&str],
mut closure: F,
) -> R {
let prefix_len = prefix.len();
for suffix in suffixes {
prefix.push_str(suffix);
}
let result = closure(prefix);
prefix.truncate(prefix_len);
result
}