Documentation
use crate::eapi::EAPI_VERSION;
use crate::logs::LogLevel;
use crate::registry;
use crate::tools::ErrLogger;
use crate::tools::{format_path, get_eva_dir};
use crate::{EResult, Error};
use crate::{BUILD, VERSION};
use elbus::rpc::{Rpc, RpcClient, RpcHandlers};
use elbus::QoS;
use eva_common::prelude::*;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use yedb::Database;

pub const SERVICE_CONFIG_VERSION: u16 = 4;

pub const DEFAULT_LAUNCHER: &str = "eva.launcher.main";

#[inline]
pub fn emit(lvl: LogLevel, service: &str, msg: &str) {
    log::log!(lvl.into(), "{} {}", service, msg);
}

#[derive(Default)]
pub struct Manager {
    services: Mutex<HashMap<String, Config>>,
    rpc: RwLock<Option<Arc<RpcClient>>>,
}

#[derive(Serialize, Deserialize)]
pub struct PayloadStartStop {
    pub name: String,
    pub initial: Initial,
}

async fn start_stop_service(
    launcher: &str,
    method: &str,
    name: &str,
    initial: Initial,
    timeout: Duration,
    rpc: &RpcClient,
) -> EResult<()> {
    let payload = PayloadStartStop {
        name: name.to_owned(),
        initial,
    };
    tokio::time::timeout(
        // give it 1 sec more to let elbus call pass
        timeout + Duration::from_secs(1),
        rpc.call(
            launcher,
            method,
            rmp_serde::to_vec_named(&payload)?.into(),
            QoS::Processed,
        ),
    )
    .await??;
    Ok(())
}

impl Manager {
    #[inline]
    pub async fn set_rpc(&self, rpc: Arc<RpcClient>) {
        self.rpc.write().await.replace(rpc);
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    pub fn load(&self, db: &mut Database) -> EResult<()> {
        info!("loading services");
        let s_key = registry::format_top_key(registry::R_SERVICE);
        let s_offs = s_key.len() + 1;
        let mut services = self.services.lock().unwrap();
        for (n, v) in db.key_get_recursive(&s_key)? {
            let name = &n[s_offs..];
            debug!("loading service {}", name);
            services.insert(name.to_owned(), serde_json::from_value(v)?);
        }
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    #[inline]
    pub fn get_service_init(
        &self,
        name: &str,
        system_name: &str,
        default_timeout: Duration,
    ) -> EResult<Initial> {
        self.services.lock().unwrap().get(name).map_or_else(
            || Err(Error::not_found(format!("no such service: {}", name))),
            |c| Ok(Initial::from_config(c, name, system_name, default_timeout)),
        )
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    #[inline]
    pub fn get_service_config(&self, name: &str) -> EResult<Config> {
        self.services.lock().unwrap().get(name).map_or_else(
            || Err(Error::not_found(format!("no such service: {}", name))),
            |c| Ok(c.clone()),
        )
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    pub async fn list_services(&self, timeout: Duration) -> Vec<Info> {
        async fn collect_service_info(
            launcher: &str,
            rpc: &RpcClient,
            info: Arc<Mutex<HashMap<String, ServiceStatus>>>,
            timeout: Duration,
        ) -> EResult<()> {
            let result = tokio::time::timeout(
                timeout,
                rpc.call(launcher, "list", elbus::empty_payload!(), QoS::Processed),
            )
            .await??;
            let data: HashMap<String, ServiceStatus> = rmp_serde::from_read_ref(result.payload())?;
            let mut info = info.lock().unwrap();
            for (n, v) in data {
                info.insert(n, v);
            }
            Ok(())
        }
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        let svc_info: Arc<Mutex<HashMap<String, ServiceStatus>>> = <_>::default();
        let mut futs = Vec::new();
        #[allow(clippy::single_element_loop)]
        for launcher in &[DEFAULT_LAUNCHER] {
            let rpc = rpc.clone();
            let svc_info = svc_info.clone();
            let f = tokio::spawn(async move {
                collect_service_info(launcher, &rpc, svc_info, timeout)
                    .await
                    .map_err(|e| error!("Unable to collect info from {}: {}", launcher, e))
            });
            futs.push(f);
        }
        for f in futs {
            let _r = f.await.log_err();
        }
        let svc_info = svc_info.lock().unwrap();
        let mut result: Vec<Info> = self
            .services
            .lock()
            .unwrap()
            .iter()
            .map(|(n, v)| Info::from_config(v, n, svc_info.get(n)))
            .collect();
        result.sort();
        result
    }
    pub async fn start(&self, system_name: &str, default_timeout: Duration) {
        self.start_stop("start", system_name, default_timeout).await;
    }
    pub async fn stop(&self, system_name: &str, default_timeout: Duration) {
        self.start_stop("stop", system_name, default_timeout).await;
    }
    async fn start_stop(&self, method: &str, system_name: &str, default_timeout: Duration) {
        let mut futs = Vec::new();
        let mut srv = HashMap::new();
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        for (name, config) in self.services.lock().unwrap().iter() {
            srv.insert(
                name.clone(),
                Initial::from_config(config, name, system_name, default_timeout),
            );
        }
        for (name, init) in srv {
            let rpc = rpc.clone();
            let method = method.to_owned();
            let fut = tokio::spawn(async move {
                let _r = start_stop_service(
                    DEFAULT_LAUNCHER,
                    &method,
                    &name,
                    init,
                    default_timeout,
                    &rpc,
                )
                .await
                .map_err(|e| {
                    Error::failed(format!(
                        "unable to {} {} with {}: {}",
                        method, name, DEFAULT_LAUNCHER, e
                    ))
                })
                .log_err();
            });
            futs.push(fut);
        }
        for f in futs {
            let _r = f.await;
        }
    }
    /// # Panics
    ///
    /// Will panic if the local RPC is not set
    #[inline]
    pub async fn restart_service(
        &self,
        name: &str,
        system_name: &str,
        default_timeout: Duration,
    ) -> EResult<()> {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        let init = self.get_service_init(name, system_name, default_timeout)?;
        start_stop_service(DEFAULT_LAUNCHER, "start", name, init, default_timeout, rpc)
            .await
            .map_err(|e| Error::failed(format!("unable to restart {}: {}", name, e)))
            .log_err()
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    pub async fn deploy_service(
        &self,
        name: &str,
        config: Config,
        system_name: &str,
        default_timeout: Duration,
    ) -> EResult<()> {
        registry::key_set(
            registry::R_SERVICE,
            name,
            &config,
            self.rpc.read().await.as_ref().unwrap(),
        )
        .await?;
        self.services
            .lock()
            .unwrap()
            .insert(name.to_owned(), config);
        self.restart_service(name, system_name, default_timeout)
            .await?;
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the local RPC is not set
    pub async fn purge_service(
        &self,
        name: &str,
        system_name: &str,
        default_timeout: Duration,
    ) -> EResult<()> {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        start_stop_service(
            DEFAULT_LAUNCHER,
            "stop_and_purge",
            name,
            self.get_service_init(name, system_name, default_timeout)?,
            default_timeout,
            rpc,
        )
        .await?;
        registry::key_delete(registry::R_SERVICE, name, rpc).await?;
        registry::key_delete_recursive(registry::R_SERVICE_DATA, name, rpc).await?;
        registry::key_delete_recursive(registry::R_CACHE, name, rpc).await?;
        self.services.lock().unwrap().remove(name);
        Ok(())
    }
    /// # Panics
    ///
    /// Will panic if the mutex is poisoned
    pub async fn undeploy_service(
        &self,
        name: &str,
        system_name: &str,
        default_timeout: Duration,
    ) -> EResult<()> {
        let rpc_c = self.rpc.read().await;
        let rpc = rpc_c.as_ref().unwrap();
        start_stop_service(
            DEFAULT_LAUNCHER,
            "stop",
            name,
            self.get_service_init(name, system_name, default_timeout)?,
            default_timeout,
            rpc,
        )
        .await?;
        registry::key_delete(registry::R_SERVICE, name, rpc).await?;
        self.services.lock().unwrap().remove(name);
        Ok(())
    }
}

#[derive(Debug, Serialize, Deserialize, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct Timeout {
    startup: Option<f64>,
    shutdown: Option<f64>,
    default: Option<f64>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Initial {
    #[serde(rename = "version")]
    config_version: u16,
    system_name: String,
    name: String,
    command: String,
    data_path: String,
    timeout: Timeout,
    core: CoreInfo,
    bus: BusConfig,
    config: Option<Value>,
}

impl Initial {
    #[inline]
    pub fn config_version(&self) -> u16 {
        self.config_version
    }
    #[inline]
    pub fn system_name(&self) -> &str {
        &self.system_name
    }
    #[inline]
    pub fn name(&self) -> &str {
        &self.name
    }
    #[inline]
    pub fn command(&self) -> &str {
        &self.command
    }
    #[inline]
    pub fn data_path(&self) -> &str {
        &self.data_path
    }
    #[inline]
    pub fn timeout(&self) -> Duration {
        self.timeout
            .default
            .map_or(crate::DEFAULT_TIMEOUT, Duration::from_secs_f64)
    }
    #[inline]
    pub fn startup_timeout(&self) -> Duration {
        self.timeout
            .default
            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
    }
    #[inline]
    pub fn shutdown_timeout(&self) -> Duration {
        self.timeout
            .default
            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
    }
    #[inline]
    pub fn bus_timeout(&self) -> Duration {
        self.bus
            .timeout
            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
    }
    #[inline]
    pub fn eva_build(&self) -> u64 {
        self.core.build
    }
    #[inline]
    pub fn eva_version(&self) -> &str {
        &self.core.version
    }
    #[inline]
    pub fn eapi_version(&self) -> u16 {
        self.core.eapi_verion
    }
    #[inline]
    pub fn eva_dir(&self) -> &str {
        &self.core.path
    }
    #[inline]
    pub fn eva_log_level(&self) -> u8 {
        self.core.log_level
    }
    #[inline]
    pub fn eva_log_level_filter(&self) -> log::LevelFilter {
        match self.core.log_level {
            eva_common::LOG_LEVEL_TRACE => log::LevelFilter::Trace,
            eva_common::LOG_LEVEL_DEBUG => log::LevelFilter::Debug,
            eva_common::LOG_LEVEL_WARN => log::LevelFilter::Warn,
            eva_common::LOG_LEVEL_ERROR => log::LevelFilter::Error,
            _ => log::LevelFilter::Info,
        }
    }
    #[inline]
    pub fn elbus_config(&self) -> elbus::ipc::Config {
        elbus::ipc::Config::new(&self.bus.path, &self.name)
            .buf_size(self.bus.buf_size)
            .buf_ttl(Duration::from_micros(self.bus.buf_ttl))
            .queue_size(self.bus.queue_size)
            .timeout(self.bus_timeout())
    }
    #[inline]
    pub fn config(&self) -> Option<&Value> {
        self.config.as_ref()
    }
    #[inline]
    pub fn elbus_queue_size(&self) -> usize {
        self.bus.queue_size
    }
    #[inline]
    pub fn take_config(&mut self) -> Option<Value> {
        self.config.take()
    }
    pub async fn init_rpc<R>(&self, handlers: R) -> EResult<RpcClient>
    where
        R: RpcHandlers + Send + Sync + 'static,
    {
        let bus = tokio::time::timeout(
            self.bus_timeout(),
            elbus::ipc::Client::connect(&self.elbus_config()),
        )
        .await??;
        let rpc = RpcClient::new(bus, handlers);
        Ok(rpc)
    }
    fn from_config(c: &Config, name: &str, system_name: &str, default_timeout: Duration) -> Self {
        let dir_eva = get_eva_dir();
        let mut timeout = c.timeout.clone();
        if timeout.default.is_none() {
            timeout.default = Some(default_timeout.as_secs_f64());
        }
        if timeout.startup.is_none() {
            timeout.startup = Some(default_timeout.as_secs_f64());
        }
        if timeout.shutdown.is_none() {
            timeout.shutdown = Some(default_timeout.as_secs_f64());
        }
        let mut bus = c.bus.clone();
        if bus.timeout.is_none() {
            bus.timeout = Some(elbus::ipc::DEFAULT_TIMEOUT.as_secs_f64());
        }
        bus.path = format_path(&dir_eva, Some(&bus.path), None);
        Self {
            config_version: SERVICE_CONFIG_VERSION,
            system_name: system_name.to_owned(),
            name: name.to_owned(),
            command: format_path(&dir_eva, Some(&c.command), None),
            data_path: format!("{}/runtime/svc_data/{}", dir_eva, name),
            timeout: c.timeout.clone(),
            core: CoreInfo {
                build: BUILD,
                version: VERSION.to_owned(),
                eapi_verion: EAPI_VERSION,
                path: dir_eva,
                log_level: crate::logs::get_min_log_level().0,
            },
            bus,
            config: c.config.clone(),
        }
    }
    pub async fn read() -> EResult<Self> {
        let mut stdin = tokio::io::stdin();
        let mut buf: Vec<u8> = Vec::new();
        tokio::time::timeout(crate::DEFAULT_TIMEOUT, stdin.read_to_end(&mut buf)).await??;
        let initial: Initial = rmp_serde::from_read_ref(&buf)?;
        if initial.config_version() != SERVICE_CONFIG_VERSION {
            return Err(Error::not_implemented(format!(
                "config version not supported: {}",
                initial.config_version()
            )));
        }
        if initial.eapi_version() != EAPI_VERSION {
            return Err(Error::not_implemented(format!(
                "EAPI version not supported: {}",
                initial.config_version(),
            )));
        }
        Ok(initial)
    }
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CoreInfo {
    build: u64,
    version: String,
    eapi_verion: u16,
    path: String,
    log_level: u8,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
    command: String,
    #[serde(default)]
    timeout: Timeout,
    bus: BusConfig,
    config: Option<Value>,
}

#[inline]
fn default_bus_type() -> String {
    "elbus".to_owned()
}

#[inline]
fn default_elbus_buf_size() -> usize {
    elbus::ipc::DEFAULT_BUF_SIZE
}

#[allow(clippy::cast_possible_truncation)]
#[inline]
fn default_elbus_buf_ttl() -> u64 {
    elbus::ipc::DEFAULT_BUF_TTL.as_micros() as u64
}

#[inline]
fn default_elbus_queue_size() -> usize {
    elbus::ipc::DEFAULT_QUEUE_SIZE
}

#[inline]
fn default_elbus_ping_interval() -> f64 {
    1.0
}

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct BusConfig {
    #[serde(rename = "type", default = "default_bus_type")]
    tp: String,
    path: String,
    timeout: Option<f64>,
    #[serde(default = "default_elbus_buf_size")]
    buf_size: usize,
    #[serde(default = "default_elbus_buf_ttl")]
    buf_ttl: u64, // microseconds
    #[serde(default = "default_elbus_queue_size")]
    queue_size: usize,
    #[serde(default = "default_elbus_ping_interval")]
    ping_interval: f64,
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct Info {
    name: String,
    launcher: String,
    status: Status,
    pid: Option<u32>,
}

/// Used by API to show the actial service state (Unknown = unable to contact the launcher)
#[derive(Serialize, Deserialize, Copy, Clone, Debug, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Status {
    Unknown,
    Offline,
    Starting,
    Terminating,
    Online,
    Private,
}

impl Info {
    fn from_config(_c: &Config, name: &str, status: Option<&ServiceStatus>) -> Self {
        let (ss, pid) = if let Some(st) = status {
            if st.pid.is_some() {
                (st.status, st.pid)
            } else {
                (st.status, None)
            }
        } else {
            (Status::Unknown, None)
        };
        Self {
            name: name.to_owned(),
            launcher: DEFAULT_LAUNCHER.to_owned(),
            status: ss,
            pid,
        }
    }
}

impl Ord for Info {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.name.cmp(&other.name)
    }
}

impl PartialOrd for Info {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.name.cmp(&other.name))
    }
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServiceInfo {
    author: String,
    version: String,
    description: String,
}

impl ServiceInfo {
    pub fn new(author: &str, version: &str, description: &str) -> Self {
        Self {
            author: author.to_owned(),
            version: version.to_owned(),
            description: description.to_owned(),
        }
    }
}

#[derive(Serialize, Deserialize)]
pub struct ServiceStatus {
    pub pid: Option<u32>,
    pub status: Status,
}

#[derive(Serialize, Deserialize)]
pub struct ServiceStatusBroadcastEvent {
    pub status: ServiceStatusBroadcast,
}

impl ServiceStatusBroadcastEvent {
    #[inline]
    pub fn ready() -> Self {
        Self {
            status: ServiceStatusBroadcast::Ready,
        }
    }
    #[inline]
    pub fn terminating() -> Self {
        Self {
            status: ServiceStatusBroadcast::Terminating,
        }
    }
}

/// Used by services and the core to notify about its state
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[repr(u8)]
pub enum ServiceStatusBroadcast {
    Starting = 0,
    Ready = 1,
    Terminating = 0xef,
    Unknown = 0xff,
}

impl From<u8> for Status {
    fn from(s: u8) -> Status {
        match s {
            0 => Status::Starting,
            1 => Status::Online,
            0xef => Status::Terminating,
            _ => Status::Private,
        }
    }
}

impl fmt::Display for ServiceStatusBroadcast {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{}",
            match self {
                ServiceStatusBroadcast::Starting => "starting",
                ServiceStatusBroadcast::Ready => "ready",
                ServiceStatusBroadcast::Terminating => "terminating",
                ServiceStatusBroadcast::Unknown => "unknown",
            }
        )
    }
}