Documentation
use crate::actions;
use crate::events::{
    RawStateEvent, ReplicationInventoryItem, ReplicationStateEvent, LOCAL_STATE_TOPIC,
    REMOTE_ARCHIVE_STATE_TOPIC, REMOTE_STATE_TOPIC,
};
use crate::events::{
    RAW_STATE_TOPIC, REPLICATION_INVENTORY_TOPIC, REPLICATION_NODE_STATE_TOPIC,
    REPLICATION_STATE_TOPIC,
};
use crate::items::{self, Filter, Inventory, InventoryStats, Item, NodeFilter};
use crate::logs::LOG_TOPIC;
use crate::registry;
use crate::services;
use crate::time::monotonic_ns;
use crate::tools::format_path;
use crate::tools::ErrLogger;
use crate::SLEEP_STEP;
use crate::{EResult, Error};
use atty::Stream;
use elbus::client::AsyncClient;
use elbus::rpc::{Rpc, RpcClient};
use elbus::QoS;
use eva_common::acl::{OIDMask, OIDMaskList};
use eva_common::prelude::*;
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
use uuid::Uuid;

pub enum LvarOp {
    Set(ItemStatus, Option<Value>),
    Reset,
    Clear,
    Toggle,
}

const ERR_MSG_STATE_LMACRO: &str = "can not update state for lmacro";

#[inline]
async fn announce_local_state<C>(
    oid_path: &str,
    state: &BTreeMap<Value, Value>,
    client: &Arc<Mutex<C>>,
) -> EResult<()>
where
    C: elbus::client::AsyncClient + ?Sized,
{
    client
        .lock()
        .await
        .publish(
            &format!("{}{}", LOCAL_STATE_TOPIC, oid_path),
            rmp_serde::to_vec_named(&state)?.into(),
            QoS::No,
        )
        .await?;
    Ok(())
}

#[inline]
async fn announce_remote_state<C>(
    oid_path: &str,
    state: &BTreeMap<Value, Value>,
    current: bool,
    client: &Arc<Mutex<C>>,
) -> EResult<()>
where
    C: elbus::client::AsyncClient + ?Sized,
{
    client
        .lock()
        .await
        .publish(
            &format!(
                "{}{}",
                if current {
                    REMOTE_STATE_TOPIC
                } else {
                    REMOTE_ARCHIVE_STATE_TOPIC
                },
                oid_path
            ),
            rmp_serde::to_vec_named(&state)?.into(),
            QoS::No,
        )
        .await?;
    Ok(())
}
#[inline]
async fn save_item_state(
    oid_path: &str,
    state: BTreeMap<Value, Value>,
    rpc: &RpcClient,
) -> EResult<()> {
    registry::key_set(registry::R_STATE, oid_path, state, rpc).await?;
    Ok(())
}
#[inline]
async fn save_item_config(
    oid_path: &str,
    config: BTreeMap<Value, Value>,
    rpc: &RpcClient,
) -> EResult<()> {
    registry::key_set(registry::R_INVENTORY, oid_path, config, rpc).await?;
    Ok(())
}
#[inline]
async fn destroy_registry_item(oid_path: &str, rpc: &RpcClient) -> EResult<()> {
    registry::key_delete(registry::R_STATE, oid_path, rpc).await?;
    registry::key_delete(registry::R_INVENTORY, oid_path, rpc).await?;
    Ok(())
}

macro_rules! prepare_state_data {
    ($item: expr, $state: expr, $instant_save: expr) => {{
        let s_st = $item.serialize_state_basic_from($state);
        let db_st = if $instant_save {
            Some(items::serialize_db_state_from($state))
        } else {
            None
        };
        (s_st, db_st)
    }};
}

#[derive(Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Core {
    #[serde(skip_deserializing, default = "crate::get_version_owned")]
    version: String,
    #[serde(skip_deserializing, default = "crate::get_build")]
    build: u64,
    #[serde(skip_deserializing, default = "crate::eapi::get_version")]
    eapi_version: u16,
    #[serde(skip_deserializing)]
    boot_id: u64,
    #[serde(skip)]
    dir_eva: String,
    system_name: String,
    #[serde(skip_serializing)]
    pid_file: String,
    instant_save: bool,
    #[serde(
        skip_serializing,
        deserialize_with = "crate::tools::de_float_as_duration"
    )]
    suicide_timeout: Duration,
    #[serde(
        skip_serializing,
        deserialize_with = "crate::tools::de_float_as_duration"
    )]
    timeout: Duration,
    #[serde(
        skip_serializing,
        deserialize_with = "crate::tools::de_float_as_duration"
    )]
    keep_action_history: Duration,
    workers: usize,
    #[serde(skip_deserializing, default = "std::process::id")]
    pid: u32,
    #[serde(
        skip_deserializing,
        serialize_with = "crate::tools::serialize_time_now"
    )]
    time: (),
    #[serde(
        skip_deserializing,
        default = "Instant::now",
        serialize_with = "crate::tools::serialize_uptime"
    )]
    uptime: Instant,
    #[serde(skip)]
    active: Arc<atomic::AtomicBool>,
    #[serde(skip)]
    files_to_remove: Vec<String>,
    #[serde(skip)]
    inventory: Arc<RwLock<Inventory>>,
    #[serde(skip)]
    rpc: RwLock<Option<Arc<RpcClient>>>,
    #[serde(skip)]
    inv_process: std::sync::Mutex<HashSet<String>>,
    #[serde(skip)]
    scheduled_saves: std::sync::Mutex<HashSet<OID>>,
    #[serde(skip)]
    action_manager: Arc<actions::Manager>,
    #[serde(skip)]
    service_manager: services::Manager,
}

macro_rules! handle_term_signal {
    ($kind: expr, $active: expr, $can_log: expr) => {
        tokio::spawn(async move {
            trace!("starting handler for {:?}", $kind);
            loop {
                match signal($kind) {
                    Ok(mut v) => {
                        v.recv().await;
                    }
                    Err(e) => {
                        error!("Unable to bind to signal {:?}: {}", $kind, e);
                        break;
                    }
                }
                if $can_log {
                    debug!("got termination signal");
                } else {
                    crate::logs::disable_console_log();
                }
                $active.store(false, atomic::Ordering::SeqCst);
            }
        });
    };
}

macro_rules! ignore_term_signal {
    ($kind: expr) => {
        tokio::spawn(async move {
            trace!("starting empty handler for {:?}", $kind);
            loop {
                match signal($kind) {
                    Ok(mut v) => {
                        v.recv().await;
                    }
                    Err(e) => {
                        error!("Unable to bind to signal {:?}: {}", $kind, e);
                        break;
                    }
                }
                trace!("got termination signal, ignoring");
            }
        });
    };
}

/// NOTE
/// the local state can be changed by 3 methods only:
/// create (sets blank)
/// update_state_from_raw (sets the state from the raw event)
/// deploy_local_items (may copy the old state if replace is allowed and the item exists)
///
/// Each of the methods (except create) MUST apply the item logic (if defined). If a new method is
/// added, list it here
///
/// Remote state updates do not need to apply the logic as remote items do not have it
impl Core {
    pub fn new_from_db(db: &mut yedb::Database, dir_eva: &str) -> EResult<Self> {
        let mut core: Core =
            serde_json::from_value(db.key_get(&registry::format_config_key("core"))?)?;
        core.update_paths(dir_eva);
        core.action_manager.set_keep_for(core.keep_action_history);
        Ok(core)
    }
    pub fn load_inventory(&self, db: &mut yedb::Database) -> EResult<()> {
        let inv_key = registry::format_top_key(registry::R_INVENTORY);
        let inv_offs = inv_key.len() + 1;
        info!("loading inventory");
        let inv = db.key_get_recursive(&inv_key)?;
        let st_key = registry::format_top_key(registry::R_STATE);
        let st_offs = st_key.len() + 1;
        info!("loading states");
        let st = db.key_get_recursive(&st_key)?;
        let mut states: HashMap<String, serde_json::Value> = HashMap::new();
        for (p, i) in st {
            states.insert(p[st_offs..].to_owned(), i);
        }
        info!("creating inventory");
        let mut inventory = self.inventory.try_write()?;
        for (p, i) in inv {
            let s = &p[inv_offs..];
            let oid: OID = s.parse()?;
            debug!("loading {oid}");
            let state = if let Some(st) = states.remove(s) {
                Some(serde_json::from_value(st)?)
            } else {
                None
            };
            inventory.append_item_from_value(Some(&oid), i, state, self.boot_id, true, false)?;
        }
        Ok(())
    }
    #[inline]
    fn generate_ieid(&self) -> IEID {
        IEID::new(self.boot_id, monotonic_ns())
    }
    #[inline]
    pub fn boot_id(&self) -> u64 {
        self.boot_id
    }
    #[inline]
    pub async fn inventory_stats(&self) -> InventoryStats {
        self.inventory.read().await.stats()
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    pub fn lock_source(&self, source_id: &str) -> bool {
        let mut inv_process = self.inv_process.lock().unwrap();
        if inv_process.contains(source_id) {
            false
        } else {
            inv_process.insert(source_id.to_owned());
            true
        }
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    #[inline]
    pub fn unlock_source(&self, source_id: &str) {
        self.inv_process.lock().unwrap().remove(source_id);
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set or the mutex is poisoned
    pub async fn save(&self) -> EResult<()> {
        let oids: Vec<OID> = self.scheduled_saves.lock().unwrap().drain().collect();
        if oids.is_empty() {
            return Ok(());
        }
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        for oid in oids {
            let r = self.inventory.read().await.get_item(&oid);
            if let Some(s_state) = r.and_then(|item| item.serialize_db_state()) {
                save_item_state(&oid.to_path(), s_state, rpc)
                    .await
                    .log_err()?;
            }
        }
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set
    pub async fn create_local_item(&self, oid: OID) -> EResult<Item> {
        let tp = oid.kind();
        let ieid = if tp == EvaItemKind::Lmacro {
            None
        } else {
            Some(self.generate_ieid())
        };
        let result = self.inventory.write().await.create_item(oid, ieid, None);
        match result {
            Ok(item) => {
                let rpc_c = self.rpc.read().await;
                let rpc = rpc_c.as_ref().unwrap();
                let oid_path = item.oid().to_path();
                save_item_config(
                    &oid_path,
                    item.serialize_config()
                        .ok_or_else(|| Error::core("unable to serialize config"))?,
                    rpc,
                )
                .await?;
                if let Some(stc) = item.state() {
                    let (s_state, db_st) =
                        prepare_state_data!(item, &stc.lock().unwrap(), self.instant_save);
                    self.process_new_state(item.oid(), s_state, db_st, rpc)
                        .await?;
                }
                info!("item created: {}", item.oid());
                Ok(item)
            }
            v => v.log_err(),
        }
    }
    async fn process_new_state(
        &self,
        oid: &OID,
        s_state: BTreeMap<Value, Value>,
        db_st: Option<BTreeMap<Value, Value>>,
        rpc: &RpcClient,
    ) -> EResult<()> {
        let oid_path = oid.to_path();
        let _r = announce_local_state(&oid_path, &s_state, &rpc.client())
            .await
            .log_err();
        if let Some(db_state) = db_st {
            save_item_state(&oid_path, db_state, rpc).await.log_err()?;
        } else {
            self.schedule_save(oid);
        }
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set
    ///
    /// Destroys local items by OID or mask
    #[inline]
    pub async fn destroy_local_items(&self, mask: &OIDMask) {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        let items = self
            .inventory
            .read()
            .await
            .get_items_by_mask(mask, &Filter::default().node(NodeFilter::Local));
        for item in items {
            let _r = self.inventory.write().await.remove_item(item.oid());
            let _r = destroy_registry_item(&item.oid().to_path(), rpc)
                .await
                .log_err();
            info!("item destroyed: {}", item.oid());
            self.unschedule_save(item.oid());
        }
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set
    ///
    /// Creates/recreates local items from Vec of values
    #[inline]
    pub async fn deploy_local_items(&self, configs: Vec<Value>, replace: bool) -> EResult<()> {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        for val in configs {
            let item = self.inventory.write().await.append_item_from_value(
                None,
                val.try_into()?,
                None,
                self.boot_id,
                replace,
                false,
            )?;
            let oid_path = item.oid().to_path();
            save_item_config(
                &oid_path,
                item.serialize_config()
                    .ok_or_else(|| Error::core("unable to serialize config"))?,
                rpc,
            )
            .await?;
            if let Some(stc) = item.state() {
                let (s_state, db_st) =
                    prepare_state_data!(item, &stc.lock().unwrap(), self.instant_save);
                self.process_new_state(item.oid(), s_state, db_st, rpc)
                    .await?;
            }
        }
        Ok(())
    }

    #[inline]
    pub async fn terminate_action(&self, uuid: &Uuid) -> EResult<()> {
        self.action_manager
            .terminate_action(uuid, self.timeout)
            .await
    }

    pub async fn kill_actions(&self, oid: &OID) -> EResult<()> {
        let target = {
            let inv = self.inventory.read().await;
            if let Some(unit) = inv.get_item(oid) {
                if let Some(action_params) = unit.action() {
                    action_params.svc().to_owned()
                } else {
                    return Err(Error::failed("unit action not configured"));
                }
            } else {
                return Err(Error::not_found(format!("unit not found: {}", oid)));
            }
        };
        self.action_manager
            .kill_actions(oid, &target, self.timeout)
            .await
    }

    /// Returns the result already serialized to avoid value copying
    #[inline]
    pub fn action_result_serialized(&self, uuid: &Uuid) -> EResult<Vec<u8>> {
        if let Some(info) = self.action_manager.get_action_serialized(uuid)? {
            Ok(info)
        } else {
            Err(Error::not_found("action not found"))
        }
    }
    /// # Panics
    ///
    /// Will panic if state mutex is poisoned
    #[allow(clippy::too_many_lines)]
    pub async fn action(
        &self,
        oid: &OID,
        status: ItemStatus,
        value: Option<Value>,
        priority: u8,
    ) -> EResult<(Uuid, triggered::Listener)> {
        if oid.kind() != EvaItemKind::Unit {
            return Err(Error::invalid_data("item is not a unit"));
        }
        let (action, listener, core_listener) = {
            let inv = self.inventory.read().await;
            if let Some(unit) = inv.get_item(oid) {
                if !unit.is_enabled() {
                    return Err(Error::access("unit is disabled"));
                }
                if let Some(source) = unit.source() {
                    let (action, listener, core_listener) =
                        actions::Action::new_unit(actions::UnitAction {
                            oid,
                            status,
                            value,
                            timeout: None,
                            priority,
                            config: None,
                            node: Some(source.node().to_owned()),
                            target: source.svc().to_owned(),
                        });
                    let s_st = if let Some(stc) = unit.state() {
                        let mut state = stc.lock().unwrap();
                        state.act_incr(self.generate_ieid());
                        Some(unit.serialize_state_basic_from(&state))
                    } else {
                        None
                    };
                    if let Some(s_state) = s_st {
                        let rpc_c = self.rpc.read().await;
                        let rpc = rpc_c.as_ref().unwrap();
                        announce_local_state(&unit.oid().to_path(), &s_state, &rpc.client())
                            .await?;
                    }
                    (action, listener, core_listener)
                } else if let Some(action_params) = unit.action() {
                    let (action, listener, core_listener) =
                        actions::Action::new_unit(actions::UnitAction {
                            oid,
                            status,
                            value,
                            timeout: Some(action_params.timeout().unwrap_or(self.timeout)),
                            priority,
                            config: action_params.config().map(Clone::clone),
                            node: None,
                            target: action_params.svc().to_owned(),
                        });
                    let s_st = if let Some(stc) = unit.state() {
                        let mut state = stc.lock().unwrap();
                        state.act_incr(self.generate_ieid());
                        Some(unit.serialize_state_basic_from(&state))
                    } else {
                        None
                    };
                    if let Some(s_state) = s_st {
                        let rpc_c = self.rpc.read().await;
                        let rpc = rpc_c.as_ref().unwrap();
                        announce_local_state(&unit.oid().to_path(), &s_state, &rpc.client())
                            .await?;
                    }
                    (action, listener, core_listener)
                } else {
                    return Err(Error::failed("unit action not configured"));
                }
            } else {
                return Err(Error::not_found("unit not found"));
            }
        };
        let (t_accepted, l_accepted) = triggered::trigger();
        let action_manager = self.action_manager.clone();
        let uuid = *action.uuid();
        let inventory = self.inventory.clone();
        let oid = oid.clone();
        let boot_id = self.boot_id;
        let default_timeout = self.timeout;
        tokio::spawn(async move {
            let timeout = action.timeout().unwrap_or(default_timeout);
            let _r = action_manager
                .launch_action(action, t_accepted, timeout)
                .await
                .log_err();
            if tokio::time::timeout(timeout, core_listener).await.is_err() {
                action_manager.mark_action_timed_out(&uuid);
            }
            let inv = inventory.read().await;
            if let Some(unit) = inv.get_item(&oid) {
                let s_st = if let Some(stc) = unit.state() {
                    let mut state = stc.lock().unwrap();
                    let ieid = IEID::new(boot_id, monotonic_ns());
                    state.act_decr(ieid);
                    Some(unit.serialize_state_basic_from(&state))
                } else {
                    None
                };
                if let Some(s_state) = s_st {
                    let rpc_c = action_manager.rpc().read().await;
                    let rpc = rpc_c.as_ref().unwrap();
                    let _r = announce_local_state(&unit.oid().to_path(), &s_state, &rpc.client())
                        .await
                        .log_err();
                }
            }
        });
        l_accepted.await;
        Ok((uuid, listener))
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set
    ///
    /// Destroys local items by vec of OIDs
    #[inline]
    pub async fn undeploy_local_items(&self, oids: &[&OID]) {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        for oid in oids {
            let result = self.inventory.write().await.remove_item(oid);
            if result.is_some() {
                let _r = destroy_registry_item(&oid.to_path(), rpc).await.log_err();
                info!("item destroyed: {}", oid);
            } else {
                warn!("item not found: {}", oid);
            }
            self.unschedule_save(oid);
        }
    }
    #[inline]
    pub async fn mark_source_online(&self, source_id: &str, online: bool) {
        self.inventory
            .read()
            .await
            .mark_source_online(source_id, online);
    }
    pub async fn destroy_source(&self, source_id: &str) {
        let source = self
            .inventory
            .read()
            .await
            .get_or_create_source(source_id, "");
        source.mark_destroyed();
        while !self.lock_source(source_id) {
            // wait until inventory processor abort
            sleep(SLEEP_STEP).await;
        }
        let i = self.inventory.read().await.get_items_by_source(source_id);
        if let Some(items) = i {
            for (oid, _item) in items {
                let _r = self.inventory.write().await.remove_item(&oid);
            }
        }
        info!("source destroyed: {}", source_id);
        self.unlock_source(source_id);
    }
    /// # Panics
    ///
    /// Will panic if the state mutex is poisoned
    ///
    /// Sets lvar state from RPC call
    ///
    /// LVar state is always announced
    pub async fn lvar_op(&self, oid: &OID, op: LvarOp) -> EResult<()> {
        if oid.kind() != EvaItemKind::Lvar {
            return Err(Error::not_implemented(
                "Lvar ops can be applied to Lvars only",
            ));
        }
        let lvar = self
            .inventory
            .read()
            .await
            .get_item(oid)
            .ok_or_else(|| Error::not_found(oid))?;
        if !lvar.is_enabled() {
            return Err(Error::access(format!("Lvar {} is disabled", oid)));
        }
        if let Some(st) = lvar.state() {
            let rpc_c = self.rpc.read().await;
            let rpc = rpc_c.as_ref().unwrap();
            let (s_state, db_st) = {
                let mut state = st.lock().unwrap();
                let ieid = self.generate_ieid();
                match op {
                    LvarOp::Set(status, value) => {
                        trace!("setting lvar {} state to {} {:?}", oid, status, value);
                        state.force_set_state(status, value, ieid);
                    }
                    LvarOp::Reset => {
                        trace!("resetting lvar {} state", oid);
                        state.force_set_state(1, None, ieid);
                    }
                    LvarOp::Clear => {
                        trace!("clearing lvar {} state", oid);
                        state.force_set_state(0, None, ieid);
                    }
                    LvarOp::Toggle => {
                        trace!("toggling lvar {} state", oid);
                        let st = state.status();
                        state.force_set_state(if st == 0 { 1 } else { 0 }, None, ieid);
                    }
                }
                prepare_state_data!(lvar, &state, self.instant_save)
            };
            self.process_new_state(oid, s_state, db_st, rpc).await?;
            Ok(())
        } else {
            Err(Error::core(format!("Lvar {} has no state", oid)))
        }
    }
    /// # Panics
    ///
    /// Will panic if the state mutex is poisoned
    ///
    /// Note: LVar state is not updated when the status is 0
    pub async fn update_state_from_raw(&self, oid: &OID, raw: RawStateEvent) -> EResult<()> {
        let tp = oid.kind();
        if tp == EvaItemKind::Lmacro {
            return Err(Error::not_implemented(ERR_MSG_STATE_LMACRO));
        }
        let item = self
            .inventory
            .read()
            .await
            .get_item(oid)
            .ok_or_else(|| Error::not_found(oid))?;
        if item.source().is_some() {
            return Err(Error::busy(format!(
                "unable to update item {} from raw event: remote",
                oid
            )));
        }
        if item.is_enabled() {
            if let Some(state) = item.state() {
                debug!(
                    "setting state from raw event for {}, status: {}, value: {:?}",
                    oid,
                    raw.status(),
                    raw.value()
                );
                let (s_state, db_st) = {
                    let mut state = state.lock().unwrap();
                    if tp == EvaItemKind::Lvar && state.status() == 0 {
                        // lvars with status 0 are not set from RAW
                        return Ok(());
                    }
                    if state.set_from_raw(raw, item.logic(), item.oid(), self.boot_id) {
                        prepare_state_data!(item, &state, self.instant_save)
                    } else {
                        // not modified
                        return Ok(());
                    }
                };
                let rpc_c = self.rpc.read().await;
                let rpc = rpc_c.as_ref().unwrap();
                self.process_new_state(item.oid(), s_state, db_st, rpc)
                    .await?;
            } else {
                warn!("no state property in {}", oid);
            }
        } else {
            debug!("ignoring state from raw event for {} - disabled", oid);
        }
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set
    // if refractoring, do not write-lock the inventory for long, as the process may take a very
    // long time!
    #[allow(clippy::too_many_lines)]
    pub async fn process_remote_inventory(
        &self,
        remote_inv: HashMap<OID, ReplicationInventoryItem>,
        source_id: &str,
        sender: &str,
    ) -> EResult<()> {
        #[inline]
        fn check_state(item: &ReplicationInventoryItem) -> bool {
            let tp = item.oid.kind();
            match tp {
                EvaItemKind::Lvar | EvaItemKind::Sensor => {
                    if item.act.is_some() {
                        warn!("invalid repl item {} state", item.oid);
                        return false;
                    }
                }
                EvaItemKind::Lmacro => {
                    if item.status.is_some() || item.value.is_some() || item.act.is_some() {
                        warn!("invalid repl item {} state", item.oid);
                        return false;
                    }
                }
                EvaItemKind::Unit => {}
            }
            true
        }
        if source_id.starts_with('.') {
            return Err(Error::invalid_params(
                "source ids starting with dots are reserved, ignoring incoming payload",
            ));
        }
        if !self.lock_source(source_id) {
            warn!(
                "source {} inventory processor is busy. ignoring incoming payload",
                source_id
            );
            return Ok(());
        }
        let (existing_items, source) = {
            let inv = self.inventory.read().await;
            (
                inv.get_items_by_source(source_id),
                inv.get_or_create_source(source_id, sender),
            )
        };
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        if let Some(existing) = existing_items {
            // remove deleted items
            for oid in existing.keys() {
                if source.is_destroyed() {
                    break;
                }
                if !remote_inv.contains_key(oid.as_ref()) {
                    debug!(
                        "removing remote item {}, node: {} from rpl {}",
                        oid, source_id, sender
                    );
                    let _r = self.inventory.write().await.remove_item(oid);
                }
            }
            // append new and modified items, for non-modified - update state only
            for remote in remote_inv.into_values() {
                if source.is_destroyed() {
                    break;
                }
                if let Some(ex) = existing.get(&remote.oid) {
                    if ex.source().is_none() {
                        warn!(
                            "attempt to modify local item {} from rpl by {}, ignored",
                            ex.oid(),
                            sender
                        );
                        continue;
                    }
                    if ex.meta() == remote.meta.as_ref() {
                        debug!("setting state for {} from rpl inv {}", remote.oid, sender);
                        if ex.is_enabled() != remote.enabled {
                            ex.set_enabled(remote.enabled);
                        }
                        if check_state(&remote) {
                            if let Some(st) = ex.state() {
                                if let Some(s_state) = {
                                    let mut state = st.lock().unwrap();
                                    let rse: ReplicationStateEvent = remote.into();
                                    if rse.ieid() > state.ieid() {
                                        state.set_from_rse(rse);
                                        Some(ex.serialize_state_basic_from(&state))
                                    } else {
                                        None
                                    }
                                } {
                                    let _r = announce_remote_state(
                                        &ex.oid().to_path(),
                                        &s_state,
                                        true,
                                        &rpc.client(),
                                    )
                                    .await
                                    .log_err();
                                }
                            }
                        }
                        continue;
                    }
                };
                debug!(
                    "creating remote item {}, node: {} from rpl {}",
                    remote.oid, source_id, sender
                );
                if let Ok(item) = self
                    .inventory
                    .write()
                    .await
                    .append_remote_item(remote, source.clone())
                    .log_err()
                {
                    let s_state = item.serialize_state(self.system_name());
                    let _r =
                        announce_remote_state(&item.oid().to_path(), &s_state, true, &rpc.client())
                            .await
                            .log_err();
                }
            }
        } else {
            // no source yet, add all items
            for remote in remote_inv.into_values() {
                if source.is_destroyed() {
                    break;
                }
                debug!(
                    "creating remote item {}, node: {} from rpl {}",
                    remote.oid, source_id, sender
                );
                if check_state(&remote) {
                    if let Ok(item) = self
                        .inventory
                        .write()
                        .await
                        .append_remote_item(remote, source.clone())
                        .log_err()
                    {
                        let s_state = item.serialize_state(self.system_name());
                        let _r = announce_remote_state(
                            &item.oid().to_path(),
                            &s_state,
                            true,
                            &rpc.client(),
                        )
                        .await
                        .log_err();
                    }
                }
            }
        }
        debug!("source inventory processed: {}", source_id);
        self.unlock_source(source_id);
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    pub async fn update_state_from_repl(
        &self,
        oid: &OID,
        rse: ReplicationStateEvent,
        sender: &str,
    ) -> EResult<()> {
        let tp = oid.kind();
        match tp {
            EvaItemKind::Lmacro => return Err(Error::not_implemented(ERR_MSG_STATE_LMACRO)),
            EvaItemKind::Lvar | EvaItemKind::Sensor => {
                if rse.act().is_some() {
                    return Err(Error::invalid_data(format!(
                        "invalid state payload for {}",
                        oid
                    )));
                }
            }
            EvaItemKind::Unit => {}
        }
        let item = self
            .inventory
            .read()
            .await
            .get_item(oid)
            .ok_or_else(|| Error::not_found(oid))?;
        if let Some(state) = item.state() {
            debug!("setting state from repl event for {}, from {}", oid, sender);
            if let Some(source) = item.source() {
                if rse.node().as_ref().map_or(true, |n| *n != source.node()) {
                    return Err(Error::busy(format!(
                        "unable to set item state {} from rpl event from {}: node differs",
                        oid, sender
                    )));
                }
            }
            let (s_state, current) = {
                let mut state = state.lock().unwrap();
                if rse.ieid() > state.ieid() {
                    state.set_from_rse(rse);
                    (item.serialize_state_from(&state, self.system_name()), true)
                } else {
                    (
                        item.serialize_state_from(&rse.into(), self.system_name()),
                        false,
                    )
                }
            };
            let rpc_c = self.rpc.read().await;
            let rpc = rpc_c.as_ref().unwrap();
            let oid_path = item.oid().to_path();
            let _r = announce_remote_state(&oid_path, &s_state, current, &rpc.client())
                .await
                .log_err();
        } else {
            warn!("no state property in {}", oid);
        }
        Ok(())
    }
    #[inline]
    fn schedule_save(&self, oid: &OID) {
        self.scheduled_saves.lock().unwrap().insert(oid.clone());
    }
    #[inline]
    fn unschedule_save(&self, oid: &OID) {
        self.scheduled_saves.lock().unwrap().remove(oid);
    }
    #[inline]
    pub async fn set_local_item_enabled(&self, mask: &OIDMask, value: bool) -> EResult<()> {
        self.inventory
            .read()
            .await
            .get_items_by_mask(mask, &Filter::default())
            .iter()
            .filter(|item| item.source().is_none())
            .for_each(|item| item.set_enabled(value));
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the core rpc is not set or the mutex is poisoned
    pub async fn announce_local(&self) {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        let items = self.inventory.read().await.list_local_items();
        for item in items {
            if let Some(s_state) = item.serialize_state_basic() {
                let _r = announce_local_state(&item.oid().to_path(), &s_state, &rpc.client())
                    .await
                    .log_err();
            }
        }
    }
    #[inline]
    pub async fn list_items<'a>(
        &self,
        mask: &OIDMask,
        include: Option<&OIDMaskList>,
        exclude: Option<&OIDMaskList>,
        source_id: Option<NodeFilter<'a>>,
    ) -> Vec<Item> {
        let mut filter = Filter::default();
        if let Some(v) = include {
            filter.set_include(v);
        }
        if let Some(v) = exclude {
            filter.set_exclude(v);
        }
        if let Some(v) = source_id {
            filter.set_node(v);
        }
        self.inventory.read().await.get_items_by_mask(mask, &filter)
    }
    #[inline]
    fn update_paths(&mut self, dir_eva: &str) {
        self.dir_eva = dir_eva.to_owned();
        self.pid_file = format_path(dir_eva, Some(&self.pid_file), None);
    }
    #[inline]
    pub fn is_active(&self) -> bool {
        self.active.load(atomic::Ordering::SeqCst)
    }
    #[inline]
    pub async fn set_rpc(&self, rpc: Arc<RpcClient>) {
        self.rpc.write().await.replace(rpc.clone());
        self.action_manager.set_rpc(rpc.clone()).await;
        self.service_manager.set_rpc(rpc).await;
    }
    pub fn log_summary(&self) {
        debug!("core.boot_id = {}", self.boot_id);
        debug!("core.dir_eva = {}", self.dir_eva);
        debug!("core.system_name = {}", self.system_name);
        debug!("core.instant_save = {}", self.instant_save);
        debug!("core.pid_file = {}", self.pid_file);
        debug!("core.suicide_timeout = {:?}", self.suicide_timeout);
        debug!("core.timeout = {:?}", self.timeout);
        debug!("core.workers = {}", self.workers);
    }
    #[allow(clippy::cast_sign_loss)]
    #[inline]
    pub fn set_boot_id(&mut self, db: &mut yedb::Database) -> EResult<()> {
        self.boot_id = db.key_increment(&registry::format_data_key("boot-id"))? as u64;
        Ok(())
    }
    #[inline]
    pub fn timeout(&self) -> Duration {
        self.timeout
    }
    #[inline]
    pub fn service_manager(&self) -> &services::Manager {
        &self.service_manager
    }
    #[inline]
    pub fn action_manager(&self) -> &actions::Manager {
        &self.action_manager
    }
    #[inline]
    pub fn system_name(&self) -> &str {
        &self.system_name
    }
    #[inline]
    pub fn dir_eva(&self) -> &str {
        &self.dir_eva
    }
    pub fn inventory(&self) -> &RwLock<Inventory> {
        &self.inventory
    }
    #[inline]
    pub fn workers(&self) -> usize {
        self.workers
    }
    pub async fn write_pid_file(&self) -> EResult<()> {
        tokio::fs::write(&self.pid_file, self.pid.to_string())
            .await
            .map_err(Into::into)
    }
    pub async fn register_signals(&self) {
        let mut handle_cc = match std::env::var_os("EVA_ENABLE_CC") {
            Some(v) => v == "1",
            None => false,
        };
        // always handle cc if run with cargo
        if let Some(v) = std::env::var_os("CARGO_PKG_NAME") {
            if !v.is_empty() {
                handle_cc = true;
                trace!("running under cargo");
            }
        };
        if handle_cc {
            let active = self.active.clone();
            handle_term_signal!(
                SignalKind::interrupt(),
                active,
                atty::is(Stream::Stdout) && atty::is(Stream::Stderr)
            );
        } else {
            ignore_term_signal!(SignalKind::interrupt());
        }
        let active = self.active.clone();
        handle_term_signal!(SignalKind::terminate(), active, true);
    }
    #[inline]
    pub fn add_file_to_remove(&mut self, fname: &str) {
        self.files_to_remove.push(fname.to_owned());
    }
    async fn announce_core_state(
        &self,
        state: &crate::services::ServiceStatusBroadcastEvent,
    ) -> EResult<()> {
        self.rpc
            .read()
            .await
            .as_ref()
            .unwrap()
            .client()
            .lock()
            .await
            .send_broadcast("*", rmp_serde::to_vec_named(state)?.into(), QoS::No)
            .await?;
        Ok(())
    }
    #[inline]
    pub async fn start(&self) -> EResult<()> {
        self.action_manager.start().await
    }
    #[inline]
    pub async fn mark_loaded(&self) {
        self.active.store(true, atomic::Ordering::SeqCst);
        let _r = self
            .announce_core_state(&crate::services::ServiceStatusBroadcastEvent::ready())
            .await
            .log_err();
    }
    pub async fn block(&self) {
        info!("EVA ICS local node {} ready", self.system_name);
        while self.active.load(atomic::Ordering::SeqCst) {
            sleep(SLEEP_STEP).await;
        }
        bmart::process::suicide(self.timeout, false);
        let _r = self
            .announce_core_state(&crate::services::ServiceStatusBroadcastEvent::terminating())
            .await;
        let _r = self.save().await.log_err();
        self.service_manager
            .stop(&self.system_name, self.timeout)
            .await;
        bmart::process::kill_pstree(std::process::id(), Some(Duration::from_millis(100)), false)
            .await;
        for f in &self.files_to_remove {
            let _r = tokio::fs::remove_file(f).await;
        }
        let _r = tokio::fs::remove_file(&self.pid_file).await;
        info!("the core shutted down");
    }
}

pub async fn init_core_client<C>(client: &mut C) -> EResult<()>
where
    C: AsyncClient,
{
    let lvl = crate::logs::get_min_log_level();
    let mut topics: Vec<String> = vec![
        format!("{RAW_STATE_TOPIC}#"),
        format!("{}#", actions::ACTION_TOPIC),
        format!("{REPLICATION_STATE_TOPIC}#"),
        format!("{REPLICATION_INVENTORY_TOPIC}#"),
        format!("{REPLICATION_NODE_STATE_TOPIC}#"),
        format!("{LOG_TOPIC}error"),
        format!("{LOG_TOPIC}ERROR"),
    ];
    if lvl.0 == eva_common::LOG_LEVEL_TRACE {
        topics.push(format!("{LOG_TOPIC}trace"));
        topics.push(format!("{LOG_TOPIC}TRACE"));
    }
    if lvl.0 <= eva_common::LOG_LEVEL_DEBUG {
        topics.push(format!("{LOG_TOPIC}debug"));
        topics.push(format!("{LOG_TOPIC}DEBUG"));
    }
    if lvl.0 <= eva_common::LOG_LEVEL_INFO {
        topics.push(format!("{LOG_TOPIC}info"));
        topics.push(format!("{LOG_TOPIC}INFO"));
    }
    if lvl.0 <= eva_common::LOG_LEVEL_WARN {
        topics.push(format!("{LOG_TOPIC}warn"));
        topics.push(format!("{LOG_TOPIC}WARN"));
    }
    client
        .subscribe_bulk(
            topics.iter().map(|item| &**item).collect::<Vec<&str>>(),
            QoS::No,
        )
        .await?;
    Ok(())
}