use crate::eapi::EAPI_VERSION;
use crate::logs::LogLevel;
use crate::registry;
use crate::tools::ErrLogger;
use crate::tools::{format_path, get_eva_dir};
use crate::{EResult, Error};
use crate::{BUILD, VERSION};
use elbus::rpc::{Rpc, RpcClient, RpcHandlers};
use elbus::QoS;
use eva_common::prelude::*;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use yedb::Database;
pub const SERVICE_CONFIG_VERSION: u16 = 4;
pub const DEFAULT_LAUNCHER: &str = "eva.launcher.main";
#[inline]
pub fn emit(lvl: LogLevel, service: &str, msg: &str) {
log::log!(lvl.into(), "{} {}", service, msg);
}
#[derive(Default)]
pub struct Manager {
services: Mutex<HashMap<String, Config>>,
rpc: RwLock<Option<Arc<RpcClient>>>,
}
#[derive(Serialize, Deserialize)]
pub struct PayloadStartStop {
pub name: String,
pub initial: Initial,
}
async fn start_stop_service(
launcher: &str,
method: &str,
name: &str,
initial: Initial,
timeout: Duration,
rpc: &RpcClient,
) -> EResult<()> {
let payload = PayloadStartStop {
name: name.to_owned(),
initial,
};
tokio::time::timeout(
timeout + Duration::from_secs(1),
rpc.call(
launcher,
method,
rmp_serde::to_vec_named(&payload)?.into(),
QoS::Processed,
),
)
.await??;
Ok(())
}
impl Manager {
#[inline]
pub async fn set_rpc(&self, rpc: Arc<RpcClient>) {
self.rpc.write().await.replace(rpc);
}
pub fn load(&self, db: &mut Database) -> EResult<()> {
info!("loading services");
let s_key = registry::format_top_key(registry::R_SERVICE);
let s_offs = s_key.len() + 1;
let mut services = self.services.lock().unwrap();
for (n, v) in db.key_get_recursive(&s_key)? {
let name = &n[s_offs..];
debug!("loading service {}", name);
services.insert(name.to_owned(), serde_json::from_value(v)?);
}
Ok(())
}
#[inline]
pub fn get_service_init(
&self,
name: &str,
system_name: &str,
default_timeout: Duration,
) -> EResult<Initial> {
self.services.lock().unwrap().get(name).map_or_else(
|| Err(Error::not_found(format!("no such service: {}", name))),
|c| Ok(Initial::from_config(c, name, system_name, default_timeout)),
)
}
#[inline]
pub fn get_service_config(&self, name: &str) -> EResult<Config> {
self.services.lock().unwrap().get(name).map_or_else(
|| Err(Error::not_found(format!("no such service: {}", name))),
|c| Ok(c.clone()),
)
}
pub async fn list_services(&self, timeout: Duration) -> Vec<Info> {
async fn collect_service_info(
launcher: &str,
rpc: &RpcClient,
info: Arc<Mutex<HashMap<String, ServiceStatus>>>,
timeout: Duration,
) -> EResult<()> {
let result = tokio::time::timeout(
timeout,
rpc.call(launcher, "list", elbus::empty_payload!(), QoS::Processed),
)
.await??;
let data: HashMap<String, ServiceStatus> = rmp_serde::from_read_ref(result.payload())?;
let mut info = info.lock().unwrap();
for (n, v) in data {
info.insert(n, v);
}
Ok(())
}
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let svc_info: Arc<Mutex<HashMap<String, ServiceStatus>>> = <_>::default();
let mut futs = Vec::new();
#[allow(clippy::single_element_loop)]
for launcher in &[DEFAULT_LAUNCHER] {
let rpc = rpc.clone();
let svc_info = svc_info.clone();
let f = tokio::spawn(async move {
collect_service_info(launcher, &rpc, svc_info, timeout)
.await
.map_err(|e| error!("Unable to collect info from {}: {}", launcher, e))
});
futs.push(f);
}
for f in futs {
let _r = f.await.log_err();
}
let svc_info = svc_info.lock().unwrap();
let mut result: Vec<Info> = self
.services
.lock()
.unwrap()
.iter()
.map(|(n, v)| Info::from_config(v, n, svc_info.get(n)))
.collect();
result.sort();
result
}
pub async fn start(&self, system_name: &str, default_timeout: Duration) {
self.start_stop("start", system_name, default_timeout).await;
}
pub async fn stop(&self, system_name: &str, default_timeout: Duration) {
self.start_stop("stop", system_name, default_timeout).await;
}
async fn start_stop(&self, method: &str, system_name: &str, default_timeout: Duration) {
let mut futs = Vec::new();
let mut srv = HashMap::new();
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
for (name, config) in self.services.lock().unwrap().iter() {
srv.insert(
name.clone(),
Initial::from_config(config, name, system_name, default_timeout),
);
}
for (name, init) in srv {
let rpc = rpc.clone();
let method = method.to_owned();
let fut = tokio::spawn(async move {
let _r = start_stop_service(
DEFAULT_LAUNCHER,
&method,
&name,
init,
default_timeout,
&rpc,
)
.await
.map_err(|e| {
Error::failed(format!(
"unable to {} {} with {}: {}",
method, name, DEFAULT_LAUNCHER, e
))
})
.log_err();
});
futs.push(fut);
}
for f in futs {
let _r = f.await;
}
}
#[inline]
pub async fn restart_service(
&self,
name: &str,
system_name: &str,
default_timeout: Duration,
) -> EResult<()> {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
let init = self.get_service_init(name, system_name, default_timeout)?;
start_stop_service(DEFAULT_LAUNCHER, "start", name, init, default_timeout, rpc)
.await
.map_err(|e| Error::failed(format!("unable to restart {}: {}", name, e)))
.log_err()
}
pub async fn deploy_service(
&self,
name: &str,
config: Config,
system_name: &str,
default_timeout: Duration,
) -> EResult<()> {
registry::key_set(
registry::R_SERVICE,
name,
&config,
self.rpc.read().await.as_ref().unwrap(),
)
.await?;
self.services
.lock()
.unwrap()
.insert(name.to_owned(), config);
self.restart_service(name, system_name, default_timeout)
.await?;
Ok(())
}
pub async fn purge_service(
&self,
name: &str,
system_name: &str,
default_timeout: Duration,
) -> EResult<()> {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
start_stop_service(
DEFAULT_LAUNCHER,
"stop_and_purge",
name,
self.get_service_init(name, system_name, default_timeout)?,
default_timeout,
rpc,
)
.await?;
registry::key_delete(registry::R_SERVICE, name, rpc).await?;
registry::key_delete_recursive(registry::R_SERVICE_DATA, name, rpc).await?;
registry::key_delete_recursive(registry::R_CACHE, name, rpc).await?;
self.services.lock().unwrap().remove(name);
Ok(())
}
pub async fn undeploy_service(
&self,
name: &str,
system_name: &str,
default_timeout: Duration,
) -> EResult<()> {
let rpc_c = self.rpc.read().await;
let rpc = rpc_c.as_ref().unwrap();
start_stop_service(
DEFAULT_LAUNCHER,
"stop",
name,
self.get_service_init(name, system_name, default_timeout)?,
default_timeout,
rpc,
)
.await?;
registry::key_delete(registry::R_SERVICE, name, rpc).await?;
self.services.lock().unwrap().remove(name);
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
#[serde(deny_unknown_fields)]
pub struct Timeout {
startup: Option<f64>,
shutdown: Option<f64>,
default: Option<f64>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Initial {
#[serde(rename = "version")]
config_version: u16,
system_name: String,
name: String,
command: String,
data_path: String,
timeout: Timeout,
core: CoreInfo,
bus: BusConfig,
config: Option<Value>,
}
impl Initial {
#[inline]
pub fn config_version(&self) -> u16 {
self.config_version
}
#[inline]
pub fn system_name(&self) -> &str {
&self.system_name
}
#[inline]
pub fn name(&self) -> &str {
&self.name
}
#[inline]
pub fn command(&self) -> &str {
&self.command
}
#[inline]
pub fn data_path(&self) -> &str {
&self.data_path
}
#[inline]
pub fn timeout(&self) -> Duration {
self.timeout
.default
.map_or(crate::DEFAULT_TIMEOUT, Duration::from_secs_f64)
}
#[inline]
pub fn startup_timeout(&self) -> Duration {
self.timeout
.default
.map_or_else(|| self.timeout(), Duration::from_secs_f64)
}
#[inline]
pub fn shutdown_timeout(&self) -> Duration {
self.timeout
.default
.map_or_else(|| self.timeout(), Duration::from_secs_f64)
}
#[inline]
pub fn bus_timeout(&self) -> Duration {
self.bus
.timeout
.map_or_else(|| self.timeout(), Duration::from_secs_f64)
}
#[inline]
pub fn eva_build(&self) -> u64 {
self.core.build
}
#[inline]
pub fn eva_version(&self) -> &str {
&self.core.version
}
#[inline]
pub fn eapi_version(&self) -> u16 {
self.core.eapi_verion
}
#[inline]
pub fn eva_dir(&self) -> &str {
&self.core.path
}
#[inline]
pub fn eva_log_level(&self) -> u8 {
self.core.log_level
}
#[inline]
pub fn eva_log_level_filter(&self) -> log::LevelFilter {
match self.core.log_level {
eva_common::LOG_LEVEL_TRACE => log::LevelFilter::Trace,
eva_common::LOG_LEVEL_DEBUG => log::LevelFilter::Debug,
eva_common::LOG_LEVEL_WARN => log::LevelFilter::Warn,
eva_common::LOG_LEVEL_ERROR => log::LevelFilter::Error,
_ => log::LevelFilter::Info,
}
}
#[inline]
pub fn elbus_config(&self) -> elbus::ipc::Config {
elbus::ipc::Config::new(&self.bus.path, &self.name)
.buf_size(self.bus.buf_size)
.buf_ttl(Duration::from_micros(self.bus.buf_ttl))
.queue_size(self.bus.queue_size)
.timeout(self.bus_timeout())
}
#[inline]
pub fn config(&self) -> Option<&Value> {
self.config.as_ref()
}
#[inline]
pub fn elbus_queue_size(&self) -> usize {
self.bus.queue_size
}
#[inline]
pub fn take_config(&mut self) -> Option<Value> {
self.config.take()
}
pub async fn init_rpc<R>(&self, handlers: R) -> EResult<RpcClient>
where
R: RpcHandlers + Send + Sync + 'static,
{
let bus = tokio::time::timeout(
self.bus_timeout(),
elbus::ipc::Client::connect(&self.elbus_config()),
)
.await??;
let rpc = RpcClient::new(bus, handlers);
Ok(rpc)
}
fn from_config(c: &Config, name: &str, system_name: &str, default_timeout: Duration) -> Self {
let dir_eva = get_eva_dir();
let mut timeout = c.timeout.clone();
if timeout.default.is_none() {
timeout.default = Some(default_timeout.as_secs_f64());
}
if timeout.startup.is_none() {
timeout.startup = Some(default_timeout.as_secs_f64());
}
if timeout.shutdown.is_none() {
timeout.shutdown = Some(default_timeout.as_secs_f64());
}
let mut bus = c.bus.clone();
if bus.timeout.is_none() {
bus.timeout = Some(elbus::ipc::DEFAULT_TIMEOUT.as_secs_f64());
}
bus.path = format_path(&dir_eva, Some(&bus.path), None);
Self {
config_version: SERVICE_CONFIG_VERSION,
system_name: system_name.to_owned(),
name: name.to_owned(),
command: format_path(&dir_eva, Some(&c.command), None),
data_path: format!("{}/runtime/svc_data/{}", dir_eva, name),
timeout: c.timeout.clone(),
core: CoreInfo {
build: BUILD,
version: VERSION.to_owned(),
eapi_verion: EAPI_VERSION,
path: dir_eva,
log_level: crate::logs::get_min_log_level().0,
},
bus,
config: c.config.clone(),
}
}
pub async fn read() -> EResult<Self> {
let mut stdin = tokio::io::stdin();
let mut buf: Vec<u8> = Vec::new();
tokio::time::timeout(crate::DEFAULT_TIMEOUT, stdin.read_to_end(&mut buf)).await??;
let initial: Initial = rmp_serde::from_read_ref(&buf)?;
if initial.config_version() != SERVICE_CONFIG_VERSION {
return Err(Error::not_implemented(format!(
"config version not supported: {}",
initial.config_version()
)));
}
if initial.eapi_version() != EAPI_VERSION {
return Err(Error::not_implemented(format!(
"EAPI version not supported: {}",
initial.config_version(),
)));
}
Ok(initial)
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CoreInfo {
build: u64,
version: String,
eapi_verion: u16,
path: String,
log_level: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
command: String,
#[serde(default)]
timeout: Timeout,
bus: BusConfig,
config: Option<Value>,
}
#[inline]
fn default_bus_type() -> String {
"elbus".to_owned()
}
#[inline]
fn default_elbus_buf_size() -> usize {
elbus::ipc::DEFAULT_BUF_SIZE
}
#[allow(clippy::cast_possible_truncation)]
#[inline]
fn default_elbus_buf_ttl() -> u64 {
elbus::ipc::DEFAULT_BUF_TTL.as_micros() as u64
}
#[inline]
fn default_elbus_queue_size() -> usize {
elbus::ipc::DEFAULT_QUEUE_SIZE
}
#[inline]
fn default_elbus_ping_interval() -> f64 {
1.0
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct BusConfig {
#[serde(rename = "type", default = "default_bus_type")]
tp: String,
path: String,
timeout: Option<f64>,
#[serde(default = "default_elbus_buf_size")]
buf_size: usize,
#[serde(default = "default_elbus_buf_ttl")]
buf_ttl: u64, #[serde(default = "default_elbus_queue_size")]
queue_size: usize,
#[serde(default = "default_elbus_ping_interval")]
ping_interval: f64,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct Info {
name: String,
launcher: String,
status: Status,
pid: Option<u32>,
}
#[derive(Serialize, Deserialize, Copy, Clone, Debug, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Status {
Unknown,
Offline,
Starting,
Terminating,
Online,
Private,
}
impl Info {
fn from_config(_c: &Config, name: &str, status: Option<&ServiceStatus>) -> Self {
let (ss, pid) = if let Some(st) = status {
if st.pid.is_some() {
(st.status, st.pid)
} else {
(st.status, None)
}
} else {
(Status::Unknown, None)
};
Self {
name: name.to_owned(),
launcher: DEFAULT_LAUNCHER.to_owned(),
status: ss,
pid,
}
}
}
impl Ord for Info {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.name.cmp(&other.name)
}
}
impl PartialOrd for Info {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.name.cmp(&other.name))
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServiceInfo {
author: String,
version: String,
description: String,
}
impl ServiceInfo {
pub fn new(author: &str, version: &str, description: &str) -> Self {
Self {
author: author.to_owned(),
version: version.to_owned(),
description: description.to_owned(),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct ServiceStatus {
pub pid: Option<u32>,
pub status: Status,
}
#[derive(Serialize, Deserialize)]
pub struct ServiceStatusBroadcastEvent {
pub status: ServiceStatusBroadcast,
}
impl ServiceStatusBroadcastEvent {
#[inline]
pub fn ready() -> Self {
Self {
status: ServiceStatusBroadcast::Ready,
}
}
#[inline]
pub fn terminating() -> Self {
Self {
status: ServiceStatusBroadcast::Terminating,
}
}
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[repr(u8)]
pub enum ServiceStatusBroadcast {
Starting = 0,
Ready = 1,
Terminating = 0xef,
Unknown = 0xff,
}
impl From<u8> for Status {
fn from(s: u8) -> Status {
match s {
0 => Status::Starting,
1 => Status::Online,
0xef => Status::Terminating,
_ => Status::Private,
}
}
}
impl fmt::Display for ServiceStatusBroadcast {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
ServiceStatusBroadcast::Starting => "starting",
ServiceStatusBroadcast::Ready => "ready",
ServiceStatusBroadcast::Terminating => "terminating",
ServiceStatusBroadcast::Unknown => "unknown",
}
)
}
}