use crate::actions;
use crate::events::{
RawStateEvent, ReplicationInventoryItem, ReplicationStateEvent, LOCAL_STATE_TOPIC,
REMOTE_ARCHIVE_STATE_TOPIC, REMOTE_STATE_TOPIC,
};
use crate::events::{
RAW_STATE_TOPIC, REPLICATION_INVENTORY_TOPIC, REPLICATION_NODE_STATE_TOPIC,
REPLICATION_STATE_TOPIC,
};
use crate::items::{self, Filter, Inventory, InventoryStats, Item, NodeFilter};
use crate::logs::LOG_TOPIC;
use crate::registry;
use crate::services;
use crate::time::monotonic_ns;
use crate::tools::format_path;
use crate::tools::ErrLogger;
use crate::SLEEP_STEP;
use crate::{EResult, Error};
use atty::Stream;
use elbus::client::AsyncClient;
use elbus::rpc::{Rpc, RpcClient};
use elbus::QoS;
use eva_common::acl::{OIDMask, OIDMaskList};
use eva_common::prelude::*;
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use uuid::Uuid;
pub enum LvarOp {
Set(ItemStatus, Option<Value>),
Reset,
Clear,
Toggle,
}
const ERR_MSG_STATE_LMACRO: &str = "can not update state for lmacro";
#[inline]
async fn announce_local_state<C>(
oid_path: &str,
state: &BTreeMap<Value, Value>,
client: &Arc<Mutex<C>>,
) -> EResult<()>
where
C: elbus::client::AsyncClient + ?Sized,
{
client
.lock()
.await
.publish(
&format!("{}{}", LOCAL_STATE_TOPIC, oid_path),
rmp_serde::to_vec_named(&state)?.into(),
QoS::No,
)
.await?;
Ok(())
}
#[inline]
async fn announce_remote_state<C>(
oid_path: &str,
state: &BTreeMap<Value, Value>,
current: bool,
client: &Arc<Mutex<C>>,
) -> EResult<()>
where
C: elbus::client::AsyncClient + ?Sized,
{
client
.lock()
.await
.publish(
&format!(
"{}{}",
if current {
REMOTE_STATE_TOPIC
} else {
REMOTE_ARCHIVE_STATE_TOPIC
},
oid_path
),
rmp_serde::to_vec_named(&state)?.into(),
QoS::No,
)
.await?;
Ok(())
}
#[inline]
async fn save_item_state(
oid_path: &str,
state: BTreeMap<Value, Value>,
rpc: &RpcClient,
) -> EResult<()> {
registry::key_set(registry::R_STATE, oid_path, state, rpc).await?;
Ok(())
}
#[inline]
async fn save_item_config(
oid_path: &str,
config: BTreeMap<Value, Value>,
rpc: &RpcClient,
) -> EResult<()> {
registry::key_set(registry::R_INVENTORY, oid_path, config, rpc).await?;
Ok(())
}
#[inline]
async fn destroy_registry_item(oid_path: &str, rpc: &RpcClient) -> EResult<()> {
registry::key_delete(registry::R_STATE, oid_path, rpc).await?;
registry::key_delete(registry::R_INVENTORY, oid_path, rpc).await?;
Ok(())
}
macro_rules! prepare_state_data {
($item: expr, $state: expr, $instant_save: expr) => {{
let s_st = $item.serialize_state_basic_from($state);
let db_st = if $instant_save {
Some(items::serialize_db_state_from($state))
} else {
None
};
(s_st, db_st)
}};
}
#[derive(Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Core {
#[serde(skip_deserializing, default = "crate::get_version_owned")]
version: String,
#[serde(skip_deserializing, default = "crate::get_build")]
build: u64,
#[serde(skip_deserializing, default = "crate::eapi::get_version")]
eapi_version: u16,
#[serde(skip_deserializing)]
boot_id: u64,
#[serde(skip)]
dir_eva: String,
system_name: String,
#[serde(skip_serializing)]
pid_file: String,
instant_save: bool,
#[serde(
skip_serializing,
deserialize_with = "crate::tools::de_float_as_duration"
)]
suicide_timeout: Duration,
#[serde(
skip_serializing,
deserialize_with = "crate::tools::de_float_as_duration"
)]
timeout: Duration,
#[serde(
skip_serializing,
deserialize_with = "crate::tools::de_float_as_duration"
)]
keep_action_history: Duration,
workers: usize,
#[serde(skip_deserializing, default = "std::process::id")]
pid: u32,
#[serde(
skip_deserializing,
serialize_with = "crate::tools::serialize_time_now"
)]
time: (),
#[serde(
skip_deserializing,
default = "Instant::now",
serialize_with = "crate::tools::serialize_uptime"
)]
uptime: Instant,
#[serde(skip)]
active: Arc<atomic::AtomicBool>,
#[serde(skip)]
files_to_remove: Vec<String>,
#[serde(skip)]
inventory: Arc<RwLock<Inventory>>,
#[serde(skip)]
rpc: RwLock<Option<Arc<RpcClient>>>,
#[serde(skip)]
inv_process: std::sync::Mutex<HashSet<String>>,
#[serde(skip)]
scheduled_saves: std::sync::Mutex<HashSet<OID>>,
#[serde(skip)]
action_manager: Arc<actions::Manager>,
#[serde(skip)]
service_manager: services::Manager,
}
macro_rules! handle_term_signal {
($kind: expr, $active: expr, $can_log: expr) => {
tokio::spawn(async move {
trace!("starting handler for {:?}", $kind);
loop {
match signal($kind) {
Ok(mut v) => {
v.recv().await;
}
Err(e) => {
error!("Unable to bind to signal {:?}: {}", $kind, e);
break;
}
}
if $can_log {
debug!("got termination signal");
} else {
crate::logs::disable_console_log();
}
$active.store(false, atomic::Ordering::SeqCst);
}
});
};
}
macro_rules! ignore_term_signal {
($kind: expr) => {
tokio::spawn(async move {
trace!("starting empty handler for {:?}", $kind);
loop {
match signal($kind) {
Ok(mut v) => {
v.recv().await;
}
Err(e) => {
error!("Unable to bind to signal {:?}: {}", $kind, e);
break;
}
}
trace!("got termination signal, ignoring");
}
});
};
}
impl Core {
pub fn new_from_db(db: &mut yedb::Database, dir_eva: &str) -> EResult<Self> {
let mut core: Core =
serde_json::from_value(db.key_get(®istry::format_config_key("core"))?)?;
core.update_paths(dir_eva);
core.action_manager.set_keep_for(core.keep_action_history);
Ok(core)
}
pub fn load_inventory(&self, db: &mut yedb::Database) -> EResult<()> {
let inv_key = registry::format_top_key(registry::R_INVENTORY);
let inv_offs = inv_key.len() + 1;
info!("loading inventory");
let inv = db.key_get_recursive(&inv_key)?;
let st_key = registry::format_top_key(registry::R_STATE);
let st_offs = st_key.len() + 1;
info!("loading states");
let st = db.key_get_recursive(&st_key)?;
let mut states: HashMap<String, serde_json::Value> = HashMap::new();
for (p, i) in st {
states.insert(p[st_offs..].to_owned(), i);
}
info!("creating inventory");
let mut inventory = self.inventory.try_write()?;
for (p, i) in inv {
let s = &p[inv_offs..];
let oid: OID = s.parse()?;
debug!("loading {oid}");
let state = if let Some(st) = states.remove(s) {
Some(serde_json::from_value(st)?)
} else {
None
};
inventory.append_item_from_value(Some(&oid), i, state, self.boot_id, true, false)?;
}
Ok(())
}
#[inline]
fn generate_ieid(&self) -> IEID {
IEID::new(self.boot_id, monotonic_ns())
}
#[inline]
pub fn boot_id(&self) -> u64 {
self.boot_id
}
#[inline]
pub async fn inventory_stats(&self) -> InventoryStats {
self.inventory.read().await.stats()
}
pub fn lock_source(&self, source_id: &str) -> bool {
let mut inv_process = self.inv_process.lock().unwrap();
if inv_process.contains(source_id) {
false
} else {
inv_process.insert(source_id.to_owned());
true
}
}
#[inline]
pub fn unlock_source(&self, source_id: &str) {
self.inv_process.lock().unwrap().remove(source_id);
}
pub async fn save(&self) -> EResult<()> {
let oids: Vec<OID> = self.scheduled_saves.lock().unwrap().drain().collect();
if oids.is_empty() {
return Ok(());
}
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
for oid in oids {
let r = self.inventory.read().await.get_item(&oid);
if let Some(s_state) = r.and_then(|item| item.serialize_db_state()) {
save_item_state(&oid.to_path(), s_state, rpc)
.await
.log_err()?;
}
}
Ok(())
}
pub async fn create_local_item(&self, oid: OID) -> EResult<Item> {
let tp = oid.kind();
let ieid = if tp == EvaItemKind::Lmacro {
None
} else {
Some(self.generate_ieid())
};
let result = self.inventory.write().await.create_item(oid, ieid, None);
match result {
Ok(item) => {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let oid_path = item.oid().to_path();
save_item_config(
&oid_path,
item.serialize_config()
.ok_or_else(|| Error::core("unable to serialize config"))?,
rpc,
)
.await?;
if let Some(stc) = item.state() {
let (s_state, db_st) =
prepare_state_data!(item, &stc.lock().unwrap(), self.instant_save);
self.process_new_state(item.oid(), s_state, db_st, rpc)
.await?;
}
info!("item created: {}", item.oid());
Ok(item)
}
v => v.log_err(),
}
}
async fn process_new_state(
&self,
oid: &OID,
s_state: BTreeMap<Value, Value>,
db_st: Option<BTreeMap<Value, Value>>,
rpc: &RpcClient,
) -> EResult<()> {
let oid_path = oid.to_path();
let _r = announce_local_state(&oid_path, &s_state, &rpc.client())
.await
.log_err();
if let Some(db_state) = db_st {
save_item_state(&oid_path, db_state, rpc).await.log_err()?;
} else {
self.schedule_save(oid);
}
Ok(())
}
#[inline]
pub async fn destroy_local_items(&self, mask: &OIDMask) {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let items = self
.inventory
.read()
.await
.get_items_by_mask(mask, &Filter::default().node(NodeFilter::Local));
for item in items {
let _r = self.inventory.write().await.remove_item(item.oid());
let _r = destroy_registry_item(&item.oid().to_path(), rpc)
.await
.log_err();
info!("item destroyed: {}", item.oid());
self.unschedule_save(item.oid());
}
}
#[inline]
pub async fn deploy_local_items(&self, configs: Vec<Value>, replace: bool) -> EResult<()> {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
for val in configs {
let item = self.inventory.write().await.append_item_from_value(
None,
val.try_into()?,
None,
self.boot_id,
replace,
false,
)?;
let oid_path = item.oid().to_path();
save_item_config(
&oid_path,
item.serialize_config()
.ok_or_else(|| Error::core("unable to serialize config"))?,
rpc,
)
.await?;
if let Some(stc) = item.state() {
let (s_state, db_st) =
prepare_state_data!(item, &stc.lock().unwrap(), self.instant_save);
self.process_new_state(item.oid(), s_state, db_st, rpc)
.await?;
}
}
Ok(())
}
#[inline]
pub async fn terminate_action(&self, uuid: &Uuid) -> EResult<()> {
self.action_manager
.terminate_action(uuid, self.timeout)
.await
}
pub async fn kill_actions(&self, oid: &OID) -> EResult<()> {
let target = {
let inv = self.inventory.read().await;
if let Some(unit) = inv.get_item(oid) {
if let Some(action_params) = unit.action() {
action_params.svc().to_owned()
} else {
return Err(Error::failed("unit action not configured"));
}
} else {
return Err(Error::not_found(format!("unit not found: {}", oid)));
}
};
self.action_manager
.kill_actions(oid, &target, self.timeout)
.await
}
#[inline]
pub fn action_result_serialized(&self, uuid: &Uuid) -> EResult<Vec<u8>> {
if let Some(info) = self.action_manager.get_action_serialized(uuid)? {
Ok(info)
} else {
Err(Error::not_found("action not found"))
}
}
#[allow(clippy::too_many_lines)]
pub async fn action(
&self,
oid: &OID,
status: ItemStatus,
value: Option<Value>,
priority: u8,
) -> EResult<(Uuid, triggered::Listener)> {
if oid.kind() != EvaItemKind::Unit {
return Err(Error::invalid_data("item is not a unit"));
}
let (action, listener, core_listener) = {
let inv = self.inventory.read().await;
if let Some(unit) = inv.get_item(oid) {
if !unit.is_enabled() {
return Err(Error::access("unit is disabled"));
}
if let Some(source) = unit.source() {
let (action, listener, core_listener) =
actions::Action::new_unit(actions::UnitAction {
oid,
status,
value,
timeout: None,
priority,
config: None,
node: Some(source.node().to_owned()),
target: source.svc().to_owned(),
});
let s_st = if let Some(stc) = unit.state() {
let mut state = stc.lock().unwrap();
state.act_incr(self.generate_ieid());
Some(unit.serialize_state_basic_from(&state))
} else {
None
};
if let Some(s_state) = s_st {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
announce_local_state(&unit.oid().to_path(), &s_state, &rpc.client())
.await?;
}
(action, listener, core_listener)
} else if let Some(action_params) = unit.action() {
let (action, listener, core_listener) =
actions::Action::new_unit(actions::UnitAction {
oid,
status,
value,
timeout: Some(action_params.timeout().unwrap_or(self.timeout)),
priority,
config: action_params.config().map(Clone::clone),
node: None,
target: action_params.svc().to_owned(),
});
let s_st = if let Some(stc) = unit.state() {
let mut state = stc.lock().unwrap();
state.act_incr(self.generate_ieid());
Some(unit.serialize_state_basic_from(&state))
} else {
None
};
if let Some(s_state) = s_st {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
announce_local_state(&unit.oid().to_path(), &s_state, &rpc.client())
.await?;
}
(action, listener, core_listener)
} else {
return Err(Error::failed("unit action not configured"));
}
} else {
return Err(Error::not_found("unit not found"));
}
};
let (t_accepted, l_accepted) = triggered::trigger();
let action_manager = self.action_manager.clone();
let uuid = *action.uuid();
let inventory = self.inventory.clone();
let oid = oid.clone();
let boot_id = self.boot_id;
let default_timeout = self.timeout;
tokio::spawn(async move {
let timeout = action.timeout().unwrap_or(default_timeout);
let _r = action_manager
.launch_action(action, t_accepted, timeout)
.await
.log_err();
if tokio::time::timeout(timeout, core_listener).await.is_err() {
action_manager.mark_action_timed_out(&uuid);
}
let inv = inventory.read().await;
if let Some(unit) = inv.get_item(&oid) {
let s_st = if let Some(stc) = unit.state() {
let mut state = stc.lock().unwrap();
let ieid = IEID::new(boot_id, monotonic_ns());
state.act_decr(ieid);
Some(unit.serialize_state_basic_from(&state))
} else {
None
};
if let Some(s_state) = s_st {
let rpc_c = action_manager.rpc().read().await;
let rpc = rpc_c.as_ref().unwrap();
let _r = announce_local_state(&unit.oid().to_path(), &s_state, &rpc.client())
.await
.log_err();
}
}
});
l_accepted.await;
Ok((uuid, listener))
}
#[inline]
pub async fn undeploy_local_items(&self, oids: &[&OID]) {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
for oid in oids {
let result = self.inventory.write().await.remove_item(oid);
if result.is_some() {
let _r = destroy_registry_item(&oid.to_path(), rpc).await.log_err();
info!("item destroyed: {}", oid);
} else {
warn!("item not found: {}", oid);
}
self.unschedule_save(oid);
}
}
#[inline]
pub async fn mark_source_online(&self, source_id: &str, online: bool) {
self.inventory
.read()
.await
.mark_source_online(source_id, online);
}
pub async fn destroy_source(&self, source_id: &str) {
let source = self
.inventory
.read()
.await
.get_or_create_source(source_id, "");
source.mark_destroyed();
while !self.lock_source(source_id) {
sleep(SLEEP_STEP).await;
}
let i = self.inventory.read().await.get_items_by_source(source_id);
if let Some(items) = i {
for (oid, _item) in items {
let _r = self.inventory.write().await.remove_item(&oid);
}
}
info!("source destroyed: {}", source_id);
self.unlock_source(source_id);
}
pub async fn lvar_op(&self, oid: &OID, op: LvarOp) -> EResult<()> {
if oid.kind() != EvaItemKind::Lvar {
return Err(Error::not_implemented(
"Lvar ops can be applied to Lvars only",
));
}
let lvar = self
.inventory
.read()
.await
.get_item(oid)
.ok_or_else(|| Error::not_found(oid))?;
if !lvar.is_enabled() {
return Err(Error::access(format!("Lvar {} is disabled", oid)));
}
if let Some(st) = lvar.state() {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let (s_state, db_st) = {
let mut state = st.lock().unwrap();
let ieid = self.generate_ieid();
match op {
LvarOp::Set(status, value) => {
trace!("setting lvar {} state to {} {:?}", oid, status, value);
state.force_set_state(status, value, ieid);
}
LvarOp::Reset => {
trace!("resetting lvar {} state", oid);
state.force_set_state(1, None, ieid);
}
LvarOp::Clear => {
trace!("clearing lvar {} state", oid);
state.force_set_state(0, None, ieid);
}
LvarOp::Toggle => {
trace!("toggling lvar {} state", oid);
let st = state.status();
state.force_set_state(if st == 0 { 1 } else { 0 }, None, ieid);
}
}
prepare_state_data!(lvar, &state, self.instant_save)
};
self.process_new_state(oid, s_state, db_st, rpc).await?;
Ok(())
} else {
Err(Error::core(format!("Lvar {} has no state", oid)))
}
}
pub async fn update_state_from_raw(&self, oid: &OID, raw: RawStateEvent) -> EResult<()> {
let tp = oid.kind();
if tp == EvaItemKind::Lmacro {
return Err(Error::not_implemented(ERR_MSG_STATE_LMACRO));
}
let item = self
.inventory
.read()
.await
.get_item(oid)
.ok_or_else(|| Error::not_found(oid))?;
if item.source().is_some() {
return Err(Error::busy(format!(
"unable to update item {} from raw event: remote",
oid
)));
}
if item.is_enabled() {
if let Some(state) = item.state() {
debug!(
"setting state from raw event for {}, status: {}, value: {:?}",
oid,
raw.status(),
raw.value()
);
let (s_state, db_st) = {
let mut state = state.lock().unwrap();
if tp == EvaItemKind::Lvar && state.status() == 0 {
return Ok(());
}
if state.set_from_raw(raw, item.logic(), item.oid(), self.boot_id) {
prepare_state_data!(item, &state, self.instant_save)
} else {
return Ok(());
}
};
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
self.process_new_state(item.oid(), s_state, db_st, rpc)
.await?;
} else {
warn!("no state property in {}", oid);
}
} else {
debug!("ignoring state from raw event for {} - disabled", oid);
}
Ok(())
}
#[allow(clippy::too_many_lines)]
pub async fn process_remote_inventory(
&self,
remote_inv: HashMap<OID, ReplicationInventoryItem>,
source_id: &str,
sender: &str,
) -> EResult<()> {
#[inline]
fn check_state(item: &ReplicationInventoryItem) -> bool {
let tp = item.oid.kind();
match tp {
EvaItemKind::Lvar | EvaItemKind::Sensor => {
if item.act.is_some() {
warn!("invalid repl item {} state", item.oid);
return false;
}
}
EvaItemKind::Lmacro => {
if item.status.is_some() || item.value.is_some() || item.act.is_some() {
warn!("invalid repl item {} state", item.oid);
return false;
}
}
EvaItemKind::Unit => {}
}
true
}
if source_id.starts_with('.') {
return Err(Error::invalid_params(
"source ids starting with dots are reserved, ignoring incoming payload",
));
}
if !self.lock_source(source_id) {
warn!(
"source {} inventory processor is busy. ignoring incoming payload",
source_id
);
return Ok(());
}
let (existing_items, source) = {
let inv = self.inventory.read().await;
(
inv.get_items_by_source(source_id),
inv.get_or_create_source(source_id, sender),
)
};
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
if let Some(existing) = existing_items {
for oid in existing.keys() {
if source.is_destroyed() {
break;
}
if !remote_inv.contains_key(oid.as_ref()) {
debug!(
"removing remote item {}, node: {} from rpl {}",
oid, source_id, sender
);
let _r = self.inventory.write().await.remove_item(oid);
}
}
for remote in remote_inv.into_values() {
if source.is_destroyed() {
break;
}
if let Some(ex) = existing.get(&remote.oid) {
if ex.source().is_none() {
warn!(
"attempt to modify local item {} from rpl by {}, ignored",
ex.oid(),
sender
);
continue;
}
if ex.meta() == remote.meta.as_ref() {
debug!("setting state for {} from rpl inv {}", remote.oid, sender);
if ex.is_enabled() != remote.enabled {
ex.set_enabled(remote.enabled);
}
if check_state(&remote) {
if let Some(st) = ex.state() {
if let Some(s_state) = {
let mut state = st.lock().unwrap();
let rse: ReplicationStateEvent = remote.into();
if rse.ieid() > state.ieid() {
state.set_from_rse(rse);
Some(ex.serialize_state_basic_from(&state))
} else {
None
}
} {
let _r = announce_remote_state(
&ex.oid().to_path(),
&s_state,
true,
&rpc.client(),
)
.await
.log_err();
}
}
}
continue;
}
};
debug!(
"creating remote item {}, node: {} from rpl {}",
remote.oid, source_id, sender
);
if let Ok(item) = self
.inventory
.write()
.await
.append_remote_item(remote, source.clone())
.log_err()
{
let s_state = item.serialize_state(self.system_name());
let _r =
announce_remote_state(&item.oid().to_path(), &s_state, true, &rpc.client())
.await
.log_err();
}
}
} else {
for remote in remote_inv.into_values() {
if source.is_destroyed() {
break;
}
debug!(
"creating remote item {}, node: {} from rpl {}",
remote.oid, source_id, sender
);
if check_state(&remote) {
if let Ok(item) = self
.inventory
.write()
.await
.append_remote_item(remote, source.clone())
.log_err()
{
let s_state = item.serialize_state(self.system_name());
let _r = announce_remote_state(
&item.oid().to_path(),
&s_state,
true,
&rpc.client(),
)
.await
.log_err();
}
}
}
}
debug!("source inventory processed: {}", source_id);
self.unlock_source(source_id);
Ok(())
}
pub async fn update_state_from_repl(
&self,
oid: &OID,
rse: ReplicationStateEvent,
sender: &str,
) -> EResult<()> {
let tp = oid.kind();
match tp {
EvaItemKind::Lmacro => return Err(Error::not_implemented(ERR_MSG_STATE_LMACRO)),
EvaItemKind::Lvar | EvaItemKind::Sensor => {
if rse.act().is_some() {
return Err(Error::invalid_data(format!(
"invalid state payload for {}",
oid
)));
}
}
EvaItemKind::Unit => {}
}
let item = self
.inventory
.read()
.await
.get_item(oid)
.ok_or_else(|| Error::not_found(oid))?;
if let Some(state) = item.state() {
debug!("setting state from repl event for {}, from {}", oid, sender);
if let Some(source) = item.source() {
if rse.node().as_ref().map_or(true, |n| *n != source.node()) {
return Err(Error::busy(format!(
"unable to set item state {} from rpl event from {}: node differs",
oid, sender
)));
}
}
let (s_state, current) = {
let mut state = state.lock().unwrap();
if rse.ieid() > state.ieid() {
state.set_from_rse(rse);
(item.serialize_state_from(&state, self.system_name()), true)
} else {
(
item.serialize_state_from(&rse.into(), self.system_name()),
false,
)
}
};
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let oid_path = item.oid().to_path();
let _r = announce_remote_state(&oid_path, &s_state, current, &rpc.client())
.await
.log_err();
} else {
warn!("no state property in {}", oid);
}
Ok(())
}
#[inline]
fn schedule_save(&self, oid: &OID) {
self.scheduled_saves.lock().unwrap().insert(oid.clone());
}
#[inline]
fn unschedule_save(&self, oid: &OID) {
self.scheduled_saves.lock().unwrap().remove(oid);
}
#[inline]
pub async fn set_local_item_enabled(&self, mask: &OIDMask, value: bool) -> EResult<()> {
self.inventory
.read()
.await
.get_items_by_mask(mask, &Filter::default())
.iter()
.filter(|item| item.source().is_none())
.for_each(|item| item.set_enabled(value));
Ok(())
}
pub async fn announce_local(&self) {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let items = self.inventory.read().await.list_local_items();
for item in items {
if let Some(s_state) = item.serialize_state_basic() {
let _r = announce_local_state(&item.oid().to_path(), &s_state, &rpc.client())
.await
.log_err();
}
}
}
#[inline]
pub async fn list_items<'a>(
&self,
mask: &OIDMask,
include: Option<&OIDMaskList>,
exclude: Option<&OIDMaskList>,
source_id: Option<NodeFilter<'a>>,
) -> Vec<Item> {
let mut filter = Filter::default();
if let Some(v) = include {
filter.set_include(v);
}
if let Some(v) = exclude {
filter.set_exclude(v);
}
if let Some(v) = source_id {
filter.set_node(v);
}
self.inventory.read().await.get_items_by_mask(mask, &filter)
}
#[inline]
fn update_paths(&mut self, dir_eva: &str) {
self.dir_eva = dir_eva.to_owned();
self.pid_file = format_path(dir_eva, Some(&self.pid_file), None);
}
#[inline]
pub fn is_active(&self) -> bool {
self.active.load(atomic::Ordering::SeqCst)
}
#[inline]
pub async fn set_rpc(&self, rpc: Arc<RpcClient>) {
self.rpc.write().await.replace(rpc.clone());
self.action_manager.set_rpc(rpc.clone()).await;
self.service_manager.set_rpc(rpc).await;
}
pub fn log_summary(&self) {
debug!("core.boot_id = {}", self.boot_id);
debug!("core.dir_eva = {}", self.dir_eva);
debug!("core.system_name = {}", self.system_name);
debug!("core.instant_save = {}", self.instant_save);
debug!("core.pid_file = {}", self.pid_file);
debug!("core.suicide_timeout = {:?}", self.suicide_timeout);
debug!("core.timeout = {:?}", self.timeout);
debug!("core.workers = {}", self.workers);
}
#[allow(clippy::cast_sign_loss)]
#[inline]
pub fn set_boot_id(&mut self, db: &mut yedb::Database) -> EResult<()> {
self.boot_id = db.key_increment(®istry::format_data_key("boot-id"))? as u64;
Ok(())
}
#[inline]
pub fn timeout(&self) -> Duration {
self.timeout
}
#[inline]
pub fn service_manager(&self) -> &services::Manager {
&self.service_manager
}
#[inline]
pub fn action_manager(&self) -> &actions::Manager {
&self.action_manager
}
#[inline]
pub fn system_name(&self) -> &str {
&self.system_name
}
#[inline]
pub fn dir_eva(&self) -> &str {
&self.dir_eva
}
pub fn inventory(&self) -> &RwLock<Inventory> {
&self.inventory
}
#[inline]
pub fn workers(&self) -> usize {
self.workers
}
pub async fn write_pid_file(&self) -> EResult<()> {
tokio::fs::write(&self.pid_file, self.pid.to_string())
.await
.map_err(Into::into)
}
pub async fn register_signals(&self) {
let mut handle_cc = match std::env::var_os("EVA_ENABLE_CC") {
Some(v) => v == "1",
None => false,
};
if let Some(v) = std::env::var_os("CARGO_PKG_NAME") {
if !v.is_empty() {
handle_cc = true;
trace!("running under cargo");
}
};
if handle_cc {
let active = self.active.clone();
handle_term_signal!(
SignalKind::interrupt(),
active,
atty::is(Stream::Stdout) && atty::is(Stream::Stderr)
);
} else {
ignore_term_signal!(SignalKind::interrupt());
}
let active = self.active.clone();
handle_term_signal!(SignalKind::terminate(), active, true);
}
#[inline]
pub fn add_file_to_remove(&mut self, fname: &str) {
self.files_to_remove.push(fname.to_owned());
}
async fn announce_core_state(
&self,
state: &crate::services::ServiceStatusBroadcastEvent,
) -> EResult<()> {
self.rpc
.read()
.await
.as_ref()
.unwrap()
.client()
.lock()
.await
.send_broadcast("*", rmp_serde::to_vec_named(state)?.into(), QoS::No)
.await?;
Ok(())
}
#[inline]
pub async fn start(&self) -> EResult<()> {
self.action_manager.start().await
}
#[inline]
pub async fn mark_loaded(&self) {
self.active.store(true, atomic::Ordering::SeqCst);
let _r = self
.announce_core_state(&crate::services::ServiceStatusBroadcastEvent::ready())
.await
.log_err();
}
pub async fn block(&self) {
info!("EVA ICS local node {} ready", self.system_name);
while self.active.load(atomic::Ordering::SeqCst) {
sleep(SLEEP_STEP).await;
}
bmart::process::suicide(self.timeout, false);
let _r = self
.announce_core_state(&crate::services::ServiceStatusBroadcastEvent::terminating())
.await;
let _r = self.save().await.log_err();
self.service_manager
.stop(&self.system_name, self.timeout)
.await;
bmart::process::kill_pstree(std::process::id(), Some(Duration::from_millis(100)), false)
.await;
for f in &self.files_to_remove {
let _r = tokio::fs::remove_file(f).await;
}
let _r = tokio::fs::remove_file(&self.pid_file).await;
info!("the core shutted down");
}
}
pub async fn init_core_client<C>(client: &mut C) -> EResult<()>
where
C: AsyncClient,
{
let lvl = crate::logs::get_min_log_level();
let mut topics: Vec<String> = vec![
format!("{RAW_STATE_TOPIC}#"),
format!("{}#", actions::ACTION_TOPIC),
format!("{REPLICATION_STATE_TOPIC}#"),
format!("{REPLICATION_INVENTORY_TOPIC}#"),
format!("{REPLICATION_NODE_STATE_TOPIC}#"),
format!("{LOG_TOPIC}error"),
format!("{LOG_TOPIC}ERROR"),
];
if lvl.0 == eva_common::LOG_LEVEL_TRACE {
topics.push(format!("{LOG_TOPIC}trace"));
topics.push(format!("{LOG_TOPIC}TRACE"));
}
if lvl.0 <= eva_common::LOG_LEVEL_DEBUG {
topics.push(format!("{LOG_TOPIC}debug"));
topics.push(format!("{LOG_TOPIC}DEBUG"));
}
if lvl.0 <= eva_common::LOG_LEVEL_INFO {
topics.push(format!("{LOG_TOPIC}info"));
topics.push(format!("{LOG_TOPIC}INFO"));
}
if lvl.0 <= eva_common::LOG_LEVEL_WARN {
topics.push(format!("{LOG_TOPIC}warn"));
topics.push(format!("{LOG_TOPIC}WARN"));
}
client
.subscribe_bulk(
topics.iter().map(|item| &**item).collect::<Vec<&str>>(),
QoS::No,
)
.await?;
Ok(())
}