Documentation
use crate::services::Initial;
use crate::tools::ErrLogger;
use crate::{EResult, Error};
use elbus::client::AsyncClient;
use elbus::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
use elbus::{Frame, FrameKind, QoS};
use log::{debug, error, info};
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::io::{BufReader, BufWriter};
use tokio::process::Command;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::sleep;

const RESTART_PERIOD: Duration = Duration::from_secs(1);

struct ServiceRuntimeData {
    pid: u32,
    tasks: Vec<JoinHandle<()>>,
    shutdown_timeout: Duration,
    status: Arc<atomic::AtomicU8>,
}

impl Drop for ServiceRuntimeData {
    fn drop(&mut self) {
        for fut in &self.tasks {
            fut.abort();
        }
    }
}

struct Service {
    name: String,
    data: Arc<Mutex<Option<ServiceRuntimeData>>>,
    active: Arc<atomic::AtomicBool>,
    runner_fut: Option<JoinHandle<()>>,
}

impl Service {
    fn new(name: &str) -> Self {
        Self {
            name: name.to_owned(),
            data: <_>::default(),
            active: <_>::default(),
            runner_fut: None,
        }
    }
    async fn launch(
        name: &str,
        command: &str,
        args: &[&str],
        initial: &Initial,
        rpc: Arc<RwLock<Option<RpcClient>>>,
        service_data: &Mutex<Option<ServiceRuntimeData>>,
    ) -> EResult<()> {
        let mut child = Command::new(command)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .kill_on_drop(false)
            .args(args)
            .spawn()?;
        let stdin = child
            .stdin
            .take()
            .ok_or_else(|| Error::io("unable to get stdin"))?;
        let stdout = child
            .stdout
            .take()
            .ok_or_else(|| Error::io("unable to get stdout"))?;
        let stderr = child
            .stderr
            .take()
            .ok_or_else(|| Error::io("unable to get stderr"))?;
        let mut stdout_reader = BufReader::new(stdout).lines();
        let mut stderr_reader = BufReader::new(stderr).lines();
        let pid = child
            .id()
            .ok_or_else(|| Error::io("unable to get process id"))?;
        {
            let mut stdin_writer = BufWriter::new(stdin);
            stdin_writer
                .write_all(&rmp_serde::to_vec_named(initial)?)
                .await?;
            stdin_writer.flush().await?;
        }
        let startup_timeout = initial.startup_timeout();
        let shutdown_timeout = initial.shutdown_timeout();
        let timeout = initial.timeout();
        let svc_name = name.to_owned();
        {
            let mut r_data_c = service_data.lock().await;
            let pinger_fut = tokio::spawn(async move {
                async fn ping(
                    name: &str,
                    rpc: &RwLock<Option<RpcClient>>,
                    timeout: Duration,
                ) -> EResult<()> {
                    tokio::time::timeout(
                        timeout,
                        rpc.read().await.as_ref().unwrap().call(
                            name,
                            "test",
                            elbus::empty_payload!(),
                            QoS::Processed,
                        ),
                    )
                    .await??;
                    Ok(())
                }
                sleep(startup_timeout).await;
                loop {
                    if ping(&svc_name, &rpc, timeout).await.log_err().is_err() {
                        bmart::process::kill_pstree(pid, Some(shutdown_timeout), true).await;
                        break;
                    }
                    sleep(timeout).await;
                }
            });
            let svc_name = name.to_owned();
            let stdout_fut = tokio::spawn(async move {
                while let Ok(Some(line)) = stdout_reader.next_line().await {
                    info!("{} {}", svc_name, line);
                }
            });
            let svc_name = name.to_owned();
            let stderr_fut = tokio::spawn(async move {
                while let Ok(Some(line)) = stderr_reader.next_line().await {
                    error!("{} {}", svc_name, line);
                }
            });
            let status_beacon: Arc<atomic::AtomicU8> = <_>::default();
            let b = status_beacon.clone();
            let svc_name = name.to_owned();
            let ready_fut = tokio::spawn(async move {
                sleep(startup_timeout).await;
                if b.load(atomic::Ordering::SeqCst) == 0 {
                    error!("service {} is not ready, terminating", svc_name);
                    bmart::process::kill_pstree(pid, None, true).await;
                }
            });
            let r_data: ServiceRuntimeData = ServiceRuntimeData {
                pid,
                tasks: vec![pinger_fut, stdout_fut, stderr_fut, ready_fut],
                shutdown_timeout,
                status: status_beacon,
            };
            r_data_c.replace(r_data);
        }
        let result = child.wait().await;
        service_data.lock().await.take();
        result?;
        Ok(())
    }
    async fn runner(
        name: &str,
        initial: &Initial,
        rpc: Arc<RwLock<Option<RpcClient>>>,
        active: Arc<atomic::AtomicBool>,
        service_data: Arc<Mutex<Option<ServiceRuntimeData>>>,
    ) -> EResult<()> {
        while active.load(atomic::Ordering::SeqCst) {
            let mut sp = initial.command().split(' ');
            let command = sp
                .next()
                .ok_or_else(|| Error::invalid_data("command not specified"))?;
            let args: Vec<&str> = sp.collect();
            let _r = Self::launch(name, command, &args, initial, rpc.clone(), &service_data)
                .await
                .map_err(|e| Error::io(format!("unable to launch {} service: {}", name, e)))
                .log_err();
            sleep(RESTART_PERIOD).await;
        }
        Ok(())
    }
    async fn start(&mut self, initial: Initial, rpc: Arc<RwLock<Option<RpcClient>>>) {
        self.active.store(true, atomic::Ordering::SeqCst);
        let name = self.name.clone();
        let active = self.active.clone();
        let data = self.data.clone();
        let fut = tokio::spawn(async move {
            let _r = Self::runner(&name, &initial, rpc, active, data)
                .await
                .log_err();
        });
        self.runner_fut.replace(fut);
    }
    async fn stop(&self) {
        self.active.store(false, atomic::Ordering::SeqCst);
        let data = self.data.lock().await.take();
        if let Some(ref fut) = self.runner_fut {
            fut.abort();
        }
        if let Some(service_data) = data {
            bmart::process::kill_pstree(
                service_data.pid,
                Some(service_data.shutdown_timeout),
                true,
            )
            .await;
        }
    }
}

#[derive(Default)]
struct Handlers {
    services: Mutex<HashMap<String, Service>>,
    rpc: Arc<RwLock<Option<RpcClient>>>,
}

#[async_trait::async_trait]
impl RpcHandlers for Handlers {
    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
        macro_rules! stop_svc {
            ($name: expr, $services: expr) => {{
                let svc = $services.remove($name);
                if let Some(service) = svc {
                    service.stop().await;
                }
            }};
        }
        match event.parse_method()? {
            "list" => {
                use crate::services::ServiceStatus;
                let mut result: HashMap<&str, ServiceStatus> = HashMap::new();
                let services = self.services.lock().await;
                for (s, v) in services.iter() {
                    result.insert(
                        s,
                        v.data.lock().await.as_ref().map_or_else(
                            || ServiceStatus {
                                pid: None,
                                status: crate::services::Status::Starting,
                            },
                            |d| ServiceStatus {
                                pid: Some(d.pid),
                                status: d.status.load(atomic::Ordering::SeqCst).into(),
                            },
                        ),
                    );
                }
                Ok(Some(rmp_serde::to_vec_named(&result)?))
            }
            "start" => {
                if self.rpc.read().await.is_none() {
                    return Err(Error::failed("rpc not initialized yet").into());
                }
                let mut services = self.services.lock().await;
                let p: crate::services::PayloadStartStop =
                    rmp_serde::from_read_ref(event.payload())?;
                debug!("starting service {}", p.name);
                stop_svc!(&p.name, services);
                let mut service = Service::new(&p.name);
                tokio::fs::create_dir_all(&p.initial.data_path()).await?;
                service.start(p.initial, self.rpc.clone()).await;
                services.insert(p.name, service);
                Ok(None)
            }
            "stop" => {
                let p: crate::services::PayloadStartStop =
                    rmp_serde::from_read_ref(event.payload())?;
                debug!("stopping service {}", p.name);
                stop_svc!(&p.name, self.services.lock().await);
                Ok(None)
            }
            "stop_and_purge" => {
                let p: crate::services::PayloadStartStop =
                    rmp_serde::from_read_ref(event.payload())?;
                debug!("stopping service {}", p.name);
                stop_svc!(&p.name, self.services.lock().await);
                debug!("purging service data {}", p.name);
                tokio::fs::remove_dir_all(&p.initial.data_path()).await?;
                Ok(None)
            }
            _ => Err(RpcError::method(None)),
        }
    }
    async fn handle_notification(&self, _event: RpcEvent) {}
    async fn handle_frame(&self, frame: Frame) {
        if frame.kind() == FrameKind::Broadcast && frame.sender().starts_with("eva.svc.") {
            if let Ok(status) = rmp_serde::from_slice::<crate::services::ServiceStatusBroadcastEvent>(
                frame.payload(),
            ) {
                if let Some(svc) = self.services.lock().await.get(frame.sender()) {
                    if let Some(data) = svc.data.lock().await.as_ref() {
                        data.status
                            .store(status.status as u8, atomic::Ordering::SeqCst);
                    }
                }
            }
        }
    }
}

pub async fn init<C>(client: C)
where
    C: AsyncClient + 'static,
{
    let handlers = Handlers::default();
    let handlers_rpc = handlers.rpc.clone();
    let rpc = RpcClient::new(client, handlers);
    handlers_rpc.write().await.replace(rpc);
}