Documentation
use crate::actions;
use crate::core::{Core, LvarOp};
use crate::events::{
    NodeStateEvent, NodeStatus, RawStateEvent, ReplicationInventoryItem, ReplicationStateEvent,
    RAW_STATE_TOPIC, REPLICATION_INVENTORY_TOPIC, REPLICATION_NODE_STATE_TOPIC,
    REPLICATION_STATE_TOPIC,
};
use crate::items::NodeFilter;
use crate::logs::{LogLevel, LOG_TOPIC};
use crate::services;
use crate::services::emit;
use crate::tools::ErrLogger;
use crate::Error;
use elbus::{
    rpc::{rpc_err_str, RpcError, RpcEvent, RpcHandlers, RpcResult},
    Frame, FrameKind,
};
use eva_common::acl::{OIDMask, OIDMaskList};
use eva_common::prelude::*;
use log::warn;
use regex::Regex;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;

pub const EAPI_VERSION: u16 = 1;

#[inline]
pub fn get_version() -> u16 {
    EAPI_VERSION
}

pub struct ElbusApi {
    core: Arc<Core>,
}

impl ElbusApi {
    pub fn new(core: Arc<Core>) -> Self {
        Self { core }
    }
}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsId<'a> {
    #[serde(borrow)]
    i: &'a str, // OID or OID mask (parse later)
}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct ParamsUuid {
    u: Uuid,
}

#[async_trait::async_trait]
impl RpcHandlers for ElbusApi {
    async fn handle_notification(&self, _event: RpcEvent) {}
    async fn handle_frame(&self, frame: Frame) {
        if !self.core.is_active() {
            return;
        }
        if frame.kind() == FrameKind::Publish {
            if let Some(ref topic) = frame.topic() {
                if let Some(raw_topic) = topic.strip_prefix(RAW_STATE_TOPIC) {
                    if let Ok(oid) = raw_topic.parse::<OID>() {
                        if let Ok(raw) = rmp_serde::from_slice::<RawStateEvent>(frame.payload()) {
                            let _r = self.core.update_state_from_raw(&oid, raw).await.log_err();
                        } else {
                            warn!("Invalid payload in raw event: {}", raw_topic);
                        }
                    } else {
                        warn!("Invalid OID in raw event: {}", raw_topic);
                    }
                } else if topic.starts_with(actions::ACTION_TOPIC) {
                    if let Ok(action_event) =
                        rmp_serde::from_slice::<actions::ActionEvent>(frame.payload())
                    {
                        let _r = self
                            .core
                            .action_manager()
                            .process_event(action_event)
                            .log_err();
                    } else {
                        warn!("Invalid payload in action event, topic {}", topic);
                    }
                } else if let Some(repl_topic) = topic.strip_prefix(REPLICATION_STATE_TOPIC) {
                    if let Ok(oid) = repl_topic.parse::<OID>() {
                        if let Ok(rpl) =
                            rmp_serde::from_slice::<ReplicationStateEvent>(frame.payload())
                        {
                            let _r = self
                                .core
                                .update_state_from_repl(&oid, rpl, frame.sender())
                                .await
                                .log_err();
                        } else {
                            warn!("Invalid payload in raw event: {}", repl_topic);
                        }
                    } else {
                        warn!("Invalid OID in raw event: {}", repl_topic);
                    }
                } else if let Some(inv_topic) = topic.strip_prefix(REPLICATION_INVENTORY_TOPIC) {
                    if let Ok(mut remote_items) =
                        rmp_serde::from_slice::<Vec<ReplicationInventoryItem>>(frame.payload())
                    {
                        let mut remote_inv = HashMap::new();
                        while let Some(item) = remote_items.pop() {
                            remote_inv.insert(item.oid.clone(), item);
                        }
                        let _r = self
                            .core
                            .process_remote_inventory(remote_inv, inv_topic, frame.sender())
                            .await
                            .log_err();
                    } else {
                        warn!("Invalid payload in inventory event: {}", inv_topic);
                    }
                } else if let Some(ns_topic) = topic.strip_prefix(REPLICATION_NODE_STATE_TOPIC) {
                    if let Ok(nst) = rmp_serde::from_slice::<NodeStateEvent>(frame.payload()) {
                        match nst.status {
                            NodeStatus::Online => {
                                self.core.mark_source_online(ns_topic, true).await;
                            }
                            NodeStatus::Offline => {
                                self.core.mark_source_online(ns_topic, false).await;
                            }
                            NodeStatus::Removed => self.core.destroy_source(ns_topic).await,
                        }
                    } else {
                        warn!("Invalid payload in node state event: {}", ns_topic);
                    }
                } else if let Some(log_topic) = topic.strip_prefix(LOG_TOPIC) {
                    if let Ok(lvl) = LogLevel::from_str(log_topic) {
                        if let Ok(msg) = std::str::from_utf8(frame.payload()) {
                            emit(lvl, frame.sender(), msg);
                        }
                    }
                }
            }
        }
    }
    #[allow(clippy::too_many_lines)]
    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
        if !self.core.is_active() {
            return Err(RpcError::internal(rpc_err_str("not ready")));
        }
        let payload = event.payload();
        macro_rules! set_enabled {
            ($value: expr) => {{
                if payload.is_empty() {
                    Err(RpcError::params(rpc_err_str("oid/mask not specified")))
                } else {
                    let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                    let mask = p.i.parse().map_err(Into::<Error>::into)?;
                    self.core.set_local_item_enabled(&mask, $value).await?;
                    Ok(None)
                }
            }};
        }
        macro_rules! lvar_op {
            ($op: expr) => {
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
                    let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
                    self.core.lvar_op(&oid, $op).await?;
                    Ok(None)
                }
            };
        }
        match event.parse_method()? {
            "test" => {
                if payload.is_empty() {
                    Ok(Some(rmp_serde::to_vec_named(&self.core)?))
                } else {
                    Err(RpcError::params(None))
                }
            }
            "save" => {
                if payload.is_empty() {
                    let _r = self.core.save().await.log_err();
                    Ok(None)
                } else {
                    Err(RpcError::params(None))
                }
            }
            "log.purge" => {
                if payload.is_empty() {
                    crate::logs::purge_log_records();
                    Ok(None)
                } else {
                    Err(RpcError::params(None))
                }
            }
            "log.get" => {
                #[derive(Deserialize, Default)]
                #[serde(deny_unknown_fields)]
                struct LogGetParams<'a> {
                    level: Option<Value>,
                    time: Option<u32>,
                    limit: Option<u32>,
                    module: Option<&'a str>,
                    #[serde(borrow)]
                    rx: Option<&'a str>,
                }
                let params: LogGetParams = if payload.is_empty() {
                    LogGetParams::default()
                } else {
                    rmp_serde::from_read_ref(payload).log_err()?
                };
                let limit = params.limit.unwrap_or(1000);
                let log_level = if let Some(lvl) = params.level {
                    Some(lvl.try_into().log_err()?)
                } else {
                    None
                };
                let x: Option<Regex> = if let Some(rx) = params.rx {
                    Some(Regex::new(rx).map_err(Into::<Error>::into).log_err()?)
                } else {
                    None
                };
                let filter = crate::logs::RecordFilter::new(
                    log_level,
                    params.module,
                    x.as_ref(),
                    params.time,
                    Some(limit),
                );
                Ok(Some(rmp_serde::to_vec_named(
                    &crate::logs::get_log_records(filter),
                )?))
            }
            "item.summary" => {
                if payload.is_empty() {
                    Ok(Some(rmp_serde::to_vec_named(
                        &self.core.inventory_stats().await,
                    )?))
                } else {
                    Err(RpcError::params(None))
                }
            }
            "item.create" => {
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
                    let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
                    self.core.create_local_item(oid).await?;
                    Ok(None)
                }
            }
            "item.destroy" => {
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
                    let mask: OIDMask = p.i.parse().map_err(Into::<Error>::into)?;
                    self.core.destroy_local_items(&mask).await;
                    Ok(None)
                }
            }
            "item.deploy" => {
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    #[derive(Deserialize)]
                    #[serde(deny_unknown_fields)]
                    struct ParamsDeploy {
                        items: Vec<Value>,
                        #[serde(default)]
                        replace: bool,
                    }
                    let params: ParamsDeploy =
                        rmp_serde::from_read_ref(event.payload()).log_err()?;
                    self.core
                        .deploy_local_items(params.items, params.replace)
                        .await?;
                    Ok(None)
                }
            }
            "item.undeploy" => {
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    #[derive(Deserialize)]
                    #[serde(deny_unknown_fields)]
                    struct ParamsUndeploy {
                        items: Vec<OID>,
                    }
                    let params: ParamsUndeploy =
                        rmp_serde::from_read_ref(event.payload()).log_err()?;
                    let oids: Vec<&OID> = params.items.iter().collect();
                    self.core.undeploy_local_items(&oids).await;
                    Ok(None)
                }
            }
            "item.get_config" => {
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    let p: ParamsId = rmp_serde::from_read_ref(event.payload()).log_err()?;
                    let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
                    let items = self
                        .core
                        .list_items(&oid.clone().into(), None, None, Some(NodeFilter::Local))
                        .await;
                    if items.is_empty() {
                        Err(Error::not_found(format!("item not found: {}", oid)).into())
                    } else {
                        Ok(Some(rmp_serde::to_vec_named(
                            &items[0].serialize_config().ok_or_else(|| {
                                Into::<RpcError>::into(Error::core("unable to serialize config"))
                            })?,
                        )?))
                    }
                }
            }
            "item.list" => {
                #[derive(Deserialize)]
                #[serde(deny_unknown_fields)]
                struct ParamsList<'a> {
                    #[serde(borrow)]
                    i: Option<&'a str>, // OID or OID mask (parse later)
                    node: Option<&'a str>, // source node (.local for local items only)
                }
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsList = rmp_serde::from_read_ref(payload).log_err()?;
                let mask = if let Some(i) = p.i {
                    i.parse().map_err(Into::<Error>::into)?
                } else {
                    OIDMask::new_any()
                };
                let node_filter = p.node.map(|v| {
                    if v == crate::LOCAL_NODE_ALIAS {
                        NodeFilter::Local
                    } else {
                        NodeFilter::Remote(v)
                    }
                });
                let items = self.core.list_items(&mask, None, None, node_filter).await;
                let system_name = self.core.system_name();
                let result: Vec<BTreeMap<Value, Value>> = items
                    .iter()
                    .map(|v| v.serialize_state_full(system_name))
                    .collect();
                Ok(Some(rmp_serde::to_vec_named(&result)?))
            }
            "item.enable" => {
                set_enabled!(true)
            }
            "item.disable" => {
                set_enabled!(false)
            }
            "item.state" => {
                #[derive(Deserialize)]
                #[serde(deny_unknown_fields)]
                struct ParamsState {
                    i: Option<OIDMask>,
                    include: Option<OIDMaskList>,
                    exclude: Option<OIDMaskList>,
                    #[serde(default)]
                    full: bool,
                }
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let mut p: ParamsState = rmp_serde::from_read_ref(payload).log_err()?;
                #[allow(clippy::redundant_closure)]
                let mask = p.i.take().unwrap_or_else(|| OIDMask::new_any());
                let items = self
                    .core
                    .list_items(&mask, p.include.as_ref(), p.exclude.as_ref(), None)
                    .await;
                let system_name = self.core.system_name();
                let result: Vec<BTreeMap<Value, Value>> = if p.full {
                    items
                        .iter()
                        .map(|v| v.serialize_state_full(system_name))
                        .collect()
                } else {
                    items
                        .iter()
                        .map(|v| v.serialize_state(system_name))
                        .collect()
                };
                Ok(Some(rmp_serde::to_vec_named(&result)?))
            }
            "action" => {
                #[derive(Deserialize)]
                #[serde(deny_unknown_fields)]
                struct ParamsAction<'a> {
                    #[serde(borrow)]
                    i: &'a str,
                    status: ItemStatus,
                    value: Option<Value>,
                    #[serde(default = "crate::actions::default_action_priority")]
                    priority: u8,
                    wait: Option<f64>,
                }
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsAction = rmp_serde::from_read_ref(payload).log_err()?;
                let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
                let (uuid, trigger) = self
                    .core
                    .action(&oid, p.status, p.value, p.priority)
                    .await?;
                if let Some(wait) = p.wait {
                    let _r = tokio::time::timeout(Duration::from_secs_f64(wait), trigger).await;
                }
                let info = self.core.action_result_serialized(&uuid)?;
                Ok(Some(info))
            }
            "action.result" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsUuid = rmp_serde::from_read_ref(payload).log_err()?;
                let info = self.core.action_result_serialized(&p.u)?;
                Ok(Some(info))
            }
            "action.terminate" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsUuid = rmp_serde::from_read_ref(payload).log_err()?;
                self.core.terminate_action(&p.u).await?;
                Ok(None)
            }
            "action.kill" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
                self.core.kill_actions(&oid).await?;
                Ok(None)
            }
            "action.list" => {
                let f: actions::Filter = if payload.is_empty() {
                    actions::Filter::default()
                } else {
                    rmp_serde::from_read_ref(payload).log_err()?
                };
                Ok(Some(
                    self.core
                        .action_manager()
                        .get_actions_filtered_serialized(&f)?,
                ))
            }
            "lvar.set" => {
                #[derive(Deserialize)]
                #[serde(deny_unknown_fields)]
                struct ParamsSet<'a> {
                    #[serde(borrow)]
                    i: &'a str, // OID or OID mask (parse later)
                    status: ItemStatus,
                    value: Option<Value>,
                }
                if payload.is_empty() {
                    Err(RpcError::params(None))
                } else {
                    let p: ParamsSet = rmp_serde::from_read_ref(event.payload()).log_err()?;
                    let oid: OID = p.i.parse().map_err(Into::<Error>::into)?;
                    self.core
                        .lvar_op(&oid, LvarOp::Set(p.status, p.value))
                        .await?;
                    Ok(None)
                }
            }
            "lvar.reset" => {
                lvar_op!(LvarOp::Reset)
            }
            "lvar.clear" => {
                lvar_op!(LvarOp::Clear)
            }
            "lvar.toggle" => {
                lvar_op!(LvarOp::Toggle)
            }
            "svc.deploy" => {
                #[derive(Deserialize)]
                #[serde(deny_unknown_fields)]
                struct ParamsServiceDeploy<'a> {
                    i: &'a str,
                    config: services::Config,
                }
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsServiceDeploy = rmp_serde::from_read_ref(payload).log_err()?;
                self.core
                    .service_manager()
                    .deploy_service(p.i, p.config, self.core.system_name(), self.core.timeout())
                    .await?;
                Ok(None)
            }
            "svc.undeploy" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                self.core
                    .service_manager()
                    .undeploy_service(p.i, self.core.system_name(), self.core.timeout())
                    .await?;
                Ok(None)
            }
            "svc.restart" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                self.core
                    .service_manager()
                    .restart_service(p.i, self.core.system_name(), self.core.timeout())
                    .await?;
                Ok(None)
            }
            "svc.purge" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                self.core
                    .service_manager()
                    .purge_service(p.i, self.core.system_name(), self.core.timeout())
                    .await?;
                Ok(None)
            }
            "svc.get_config" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                Ok(Some(rmp_serde::to_vec_named(
                    &self.core.service_manager().get_service_config(p.i)?,
                )?))
            }
            "svc.get_init" => {
                if payload.is_empty() {
                    return Err(RpcError::params(None));
                }
                let p: ParamsId = rmp_serde::from_read_ref(payload).log_err()?;
                Ok(Some(rmp_serde::to_vec_named(
                    &self.core.service_manager().get_service_init(
                        p.i,
                        self.core.system_name(),
                        self.core.timeout(),
                    )?,
                )?))
            }
            "svc.list" => {
                if payload.is_empty() {
                    Ok(Some(rmp_serde::to_vec_named(
                        &self
                            .core
                            .service_manager()
                            .list_services(self.core.timeout())
                            .await,
                    )?))
                } else {
                    Err(RpcError::params(None))
                }
            }
            _ => Err(RpcError::method(None)),
        }
    }
}