use crate::actions;
use crate::core::{Core, LvarOp};
use crate::events::{
NodeStateEvent, NodeStatus, RawStateEvent, ReplicationInventoryItem, ReplicationStateEvent,
RAW_STATE_TOPIC, REPLICATION_INVENTORY_TOPIC, REPLICATION_NODE_STATE_TOPIC,
REPLICATION_STATE_TOPIC,
};
use crate::items::NodeFilter;
use crate::logs::{LogLevel, LOG_TOPIC};
use crate::services;
use crate::services::emit;
use crate::tools::ErrLogger;
use crate::Error;
use elbus::{
rpc::{rpc_err_str, RpcError, RpcEvent, RpcHandlers, RpcResult},
Frame, FrameKind,
};
use eva_common::acl::{OIDMask, OIDMaskList};
use eva_common::prelude::*;
use log::warn;
use regex::Regex;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
pub const EAPI_VERSION: u16 = 1;
#[inline]
pub fn get_version() -> u16 {
EAPI_VERSION
}
pub struct ElbusApi {
core: Arc<Core>,
}
impl ElbusApi {
pub fn new(core: Arc<Core>) -> Self {
Self { core }
}
}
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsId<'a> {
#[serde(borrow)]
i: &'a str, }
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsUuid {
u: Uuid,
}
#[async_trait::async_trait]
impl RpcHandlers for ElbusApi {
async fn handle_notification(&self, _event: RpcEvent) {}
async fn handle_frame(&self, frame: Frame) {
if !self.core.is_active() {
return;
}
if frame.kind() == FrameKind::Publish {
if let Some(ref topic) = frame.topic() {
if let Some(raw_topic) = topic.strip_prefix(RAW_STATE_TOPIC) {
if let Ok(oid) = raw_topic.parse::<OID>() {
if let Ok(raw) = rmp_serde::from_slice::<RawStateEvent>(frame.payload()) {
let _r = self.core.update_state_from_raw(&oid, raw).await.log_err();
} else {
warn!("Invalid payload in raw event: {}", raw_topic);
}
} else {
warn!("Invalid OID in raw event: {}", raw_topic);
}
} else if topic.starts_with(actions::ACTION_TOPIC) {
if let Ok(action_event) =
rmp_serde::from_slice::<actions::ActionEvent>(frame.payload())
{
let _r = self
.core
.action_manager()
.process_event(action_event)
.log_err();
} else {
warn!("Invalid payload in action event, topic {}", topic);
}
} else if let Some(repl_topic) = topic.strip_prefix(REPLICATION_STATE_TOPIC) {
if let Ok(oid) = repl_topic.parse::<OID>() {
if let Ok(rpl) =
rmp_serde::from_slice::<ReplicationStateEvent>(frame.payload())
{
let _r = self
.core
.update_state_from_repl(&oid, rpl, frame.sender())
.await
.log_err();
} else {
warn!("Invalid payload in raw event: {}", repl_topic);
}
} else {
warn!("Invalid OID in raw event: {}", repl_topic);
}
} else if let Some(inv_topic) = topic.strip_prefix(REPLICATION_INVENTORY_TOPIC) {
if let Ok(mut remote_items) =
rmp_serde::from_slice::<Vec<ReplicationInventoryItem>>(frame.payload())
{
let mut remote_inv = HashMap::new();
while let Some(item) = remote_items.pop() {
remote_inv.insert(item.oid.clone(), item);
}
let _r = self
.core
.process_remote_inventory(remote_inv, inv_topic, frame.sender())
.await
.log_err();
} else {
warn!("Invalid payload in inventory event: {}", inv_topic);
}
} else if let Some(ns_topic) = topic.strip_prefix(REPLICATION_NODE_STATE_TOPIC) {
if let Ok(nst) = rmp_serde::from_slice::<NodeStateEvent>(frame.payload()) {
match nst.status {
NodeStatus::Online => {
self.core.mark_source_online(ns_topic, true).await;
}
NodeStatus::Offline => {
self.core.mark_source_online(ns_topic, false).await;
}
NodeStatus::Removed => self.core.destroy_source(ns_topic).await,
}
} else {
warn!("Invalid payload in node state event: {}", ns_topic);
}
} else if let Some(log_topic) = topic.strip_prefix(LOG_TOPIC) {
if let Ok(lvl) = LogLevel::from_str(log_topic) {
if let Ok(msg) = std::str::from_utf8(frame.payload()) {
emit(lvl, frame.sender(), msg);
}
}
}
}
}
}
#[allow(clippy::too_many_lines)]
async fn handle_call(&self, event: RpcEvent) -> RpcResult {
if !self.core.is_active() {
return Err(RpcError::internal(rpc_err_str("not ready")));
}
let payload = event.payload();
macro_rules! set_enabled {
($value: expr) => {{
if payload.is_empty() {
Err(RpcError::params(rpc_err_str("oid/mask not specified")))
} else {
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
let mask = p.i.parse().map_err(Into::<Error>::into)?;
self.core.set_local_item_enabled(&mask, $value).await?;
Ok(None)
}
}};
}
macro_rules! lvar_op {
($op: expr) => {
if payload.is_empty() {
Err(RpcError::params(None))
} else {
let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
self.core.lvar_op(&oid, $op).await?;
Ok(None)
}
};
}
match event.parse_method()? {
"test" => {
if payload.is_empty() {
Ok(Some(rmp_serde::to_vec_named(&self.core)?))
} else {
Err(RpcError::params(None))
}
}
"save" => {
if payload.is_empty() {
let _r = self.core.save().await.log_err();
Ok(None)
} else {
Err(RpcError::params(None))
}
}
"log.purge" => {
if payload.is_empty() {
crate::logs::purge_log_records();
Ok(None)
} else {
Err(RpcError::params(None))
}
}
"log.get" => {
#[derive(Deserialize, Default)]
#[serde(deny_unknown_fields)]
struct LogGetParams<'a> {
level: Option<Value>,
time: Option<u32>,
limit: Option<u32>,
module: Option<&'a str>,
#[serde(borrow)]
rx: Option<&'a str>,
}
let params: LogGetParams = if payload.is_empty() {
LogGetParams::default()
} else {
rmp_serde::from_read_ref(payload).log_err()?
};
let limit = params.limit.unwrap_or(1000);
let log_level = if let Some(lvl) = params.level {
Some(lvl.try_into().log_err()?)
} else {
None
};
let x: Option<Regex> = if let Some(rx) = params.rx {
Some(Regex::new(rx).map_err(Into::<Error>::into).log_err()?)
} else {
None
};
let filter = crate::logs::RecordFilter::new(
log_level,
params.module,
x.as_ref(),
params.time,
Some(limit),
);
Ok(Some(rmp_serde::to_vec_named(
&crate::logs::get_log_records(filter),
)?))
}
"item.summary" => {
if payload.is_empty() {
Ok(Some(rmp_serde::to_vec_named(
&self.core.inventory_stats().await,
)?))
} else {
Err(RpcError::params(None))
}
}
"item.create" => {
if payload.is_empty() {
Err(RpcError::params(None))
} else {
let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
self.core.create_local_item(oid).await?;
Ok(None)
}
}
"item.destroy" => {
if payload.is_empty() {
Err(RpcError::params(None))
} else {
let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
let mask: OIDMask = p.i.parse().map_err(Into::<Error>::into)?;
self.core.destroy_local_items(&mask).await;
Ok(None)
}
}
"item.deploy" => {
if payload.is_empty() {
Err(RpcError::params(None))
} else {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsDeploy {
items: Vec<Value>,
#[serde(default)]
replace: bool,
}
let params: ParamsDeploy =
rmp_serde::from_read_ref(event.payload()).log_err()?;
self.core
.deploy_local_items(params.items, params.replace)
.await?;
Ok(None)
}
}
"item.undeploy" => {
if payload.is_empty() {
Err(RpcError::params(None))
} else {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsUndeploy {
items: Vec<OID>,
}
let params: ParamsUndeploy =
rmp_serde::from_read_ref(event.payload()).log_err()?;
let oids: Vec<&OID> = params.items.iter().collect();
self.core.undeploy_local_items(&oids).await;
Ok(None)
}
}
"item.get_config" => {
if payload.is_empty() {
Err(RpcError::params(None))
} else {
let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
let items = self
.core
.list_items(&oid.clone().into(), None, None, Some(NodeFilter::Local))
.await;
if items.is_empty() {
Err(Error::not_found(format!("item not found: {}", oid)).into())
} else {
Ok(Some(rmp_serde::to_vec_named(
&items[0].serialize_config().ok_or_else(|| {
Into::<RpcError>::into(Error::core("unable to serialize config"))
})?,
)?))
}
}
}
"item.list" => {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsList<'a> {
#[serde(borrow)]
i: Option<&'a str>, node: Option<&'a str>, }
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsList = rmp_serde::from_read_ref(payload).log_err()?;
let mask = if let Some(i) = p.i {
i.parse().map_err(Into::<Error>::into)?
} else {
OIDMask::new_any()
};
let node_filter = p.node.map(|v| {
if v == crate::LOCAL_NODE_ALIAS {
NodeFilter::Local
} else {
NodeFilter::Remote(v)
}
});
let items = self.core.list_items(&mask, None, None, node_filter).await;
let system_name = self.core.system_name();
let result: Vec<BTreeMap<Value, Value>> = items
.iter()
.map(|v| v.serialize_state_full(system_name))
.collect();
Ok(Some(rmp_serde::to_vec_named(&result)?))
}
"item.enable" => {
set_enabled!(true)
}
"item.disable" => {
set_enabled!(false)
}
"item.state" => {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsState {
i: Option<OIDMask>,
include: Option<OIDMaskList>,
exclude: Option<OIDMaskList>,
#[serde(default)]
full: bool,
}
if payload.is_empty() {
return Err(RpcError::params(None));
}
let mut p: ParamsState = rmp_serde::from_read_ref(payload).log_err()?;
#[allow(clippy::redundant_closure)]
let mask = p.i.take().unwrap_or_else(|| OIDMask::new_any());
let items = self
.core
.list_items(&mask, p.include.as_ref(), p.exclude.as_ref(), None)
.await;
let system_name = self.core.system_name();
let result: Vec<BTreeMap<Value, Value>> = if p.full {
items
.iter()
.map(|v| v.serialize_state_full(system_name))
.collect()
} else {
items
.iter()
.map(|v| v.serialize_state(system_name))
.collect()
};
Ok(Some(rmp_serde::to_vec_named(&result)?))
}
"action" => {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsAction<'a> {
#[serde(borrow)]
i: &'a str,
status: ItemStatus,
value: Option<Value>,
#[serde(default = "crate::actions::default_action_priority")]
priority: u8,
wait: Option<f64>,
}
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsAction = rmp_serde::from_read_ref(payload).log_err()?;
let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
let (uuid, trigger) = self
.core
.action(&oid, p.status, p.value, p.priority)
.await?;
if let Some(wait) = p.wait {
let _r = tokio::time::timeout(Duration::from_secs_f64(wait), trigger).await;
}
let info = self.core.action_result_serialized(&uuid)?;
Ok(Some(info))
}
"action.result" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsUuid = rmp_serde::from_read_ref(payload).log_err()?;
let info = self.core.action_result_serialized(&p.u)?;
Ok(Some(info))
}
"action.terminate" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsUuid = rmp_serde::from_read_ref(payload).log_err()?;
self.core.terminate_action(&p.u).await?;
Ok(None)
}
"action.kill" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
self.core.kill_actions(&oid).await?;
Ok(None)
}
"action.list" => {
let f: actions::Filter = if payload.is_empty() {
actions::Filter::default()
} else {
rmp_serde::from_read_ref(payload).log_err()?
};
Ok(Some(
self.core
.action_manager()
.get_actions_filtered_serialized(&f)?,
))
}
"lvar.set" => {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsSet<'a> {
#[serde(borrow)]
i: &'a str, status: ItemStatus,
value: Option<Value>,
}
if payload.is_empty() {
Err(RpcError::params(None))
} else {
let p: ParamsSet = rmp_serde::from_read_ref(event.payload()).log_err()?;
let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
self.core
.lvar_op(&oid, LvarOp::Set(p.status, p.value))
.await?;
Ok(None)
}
}
"lvar.reset" => {
lvar_op!(LvarOp::Reset)
}
"lvar.clear" => {
lvar_op!(LvarOp::Clear)
}
"lvar.toggle" => {
lvar_op!(LvarOp::Toggle)
}
"svc.deploy" => {
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsServiceDeploy<'a> {
i: &'a str,
config: services::Config,
}
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsServiceDeploy = rmp_serde::from_read_ref(payload).log_err()?;
self.core
.service_manager()
.deploy_service(p.i, p.config, self.core.system_name(), self.core.timeout())
.await?;
Ok(None)
}
"svc.undeploy" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
self.core
.service_manager()
.undeploy_service(p.i, self.core.system_name(), self.core.timeout())
.await?;
Ok(None)
}
"svc.restart" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
self.core
.service_manager()
.restart_service(p.i, self.core.system_name(), self.core.timeout())
.await?;
Ok(None)
}
"svc.purge" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
self.core
.service_manager()
.purge_service(p.i, self.core.system_name(), self.core.timeout())
.await?;
Ok(None)
}
"svc.get_config" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
Ok(Some(rmp_serde::to_vec_named(
&self.core.service_manager().get_service_config(p.i)?,
)?))
}
"svc.get_init" => {
if payload.is_empty() {
return Err(RpcError::params(None));
}
let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
Ok(Some(rmp_serde::to_vec_named(
&self.core.service_manager().get_service_init(
p.i,
self.core.system_name(),
self.core.timeout(),
)?,
)?))
}
"svc.list" => {
if payload.is_empty() {
Ok(Some(rmp_serde::to_vec_named(
&self
.core
.service_manager()
.list_services(self.core.timeout())
.await,
)?))
} else {
Err(RpcError::params(None))
}
}
_ => Err(RpcError::method(None)),
}
}
}