eva-common 0.1.185

Commons for EVA ICS v4
Documentation
use crate::registry;
use crate::Value;
use crate::{EResult, Error};
use busrt::rpc::{self, RpcClient, RpcHandlers};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ffi::CString;
use std::fmt;
#[cfg(feature = "extended-value")]
use std::path::Path;
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;

pub const SERVICE_CONFIG_VERSION: u16 = 4;

pub const SERVICE_PAYLOAD_PING: u8 = 0;
pub const SERVICE_PAYLOAD_INITIAL: u8 = 1;

pub struct Registry {
    id: String,
    rpc: Arc<RpcClient>,
}

impl Registry {
    #[inline]
    pub async fn key_set<V>(&self, key: &str, value: V) -> EResult<Value>
    where
        V: Serialize,
    {
        registry::key_set(
            &registry::format_svc_data_subkey(&self.id),
            key,
            value,
            &self.rpc,
        )
        .await
    }
    #[inline]
    pub async fn key_get(&self, key: &str) -> EResult<Value> {
        registry::key_get(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
    }
    #[inline]
    pub async fn key_userdata_get(&self, key: &str) -> EResult<Value> {
        registry::key_get(registry::R_USER_DATA, key, &self.rpc).await
    }
    #[inline]
    pub async fn key_increment(&self, key: &str) -> EResult<i64> {
        registry::key_increment(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
    }

    #[inline]
    pub async fn key_decrement(&self, key: &str) -> EResult<i64> {
        registry::key_decrement(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
    }
    #[inline]
    pub async fn key_get_recursive(&self, key: &str) -> EResult<Vec<(String, Value)>> {
        registry::key_get_recursive(&registry::format_svc_data_subkey(&self.id), key, &self.rpc)
            .await
    }
    #[inline]
    pub async fn key_delete(&self, key: &str) -> EResult<Value> {
        registry::key_delete(&registry::format_svc_data_subkey(&self.id), key, &self.rpc).await
    }
    #[inline]
    pub async fn key_delete_recursive(&self, key: &str) -> EResult<Value> {
        registry::key_delete_recursive(&registry::format_svc_data_subkey(&self.id), key, &self.rpc)
            .await
    }
}

#[inline]
fn default_workers() -> u32 {
    1
}

/// Initial properties for services
#[derive(Debug, Serialize, Deserialize)]
pub struct Initial {
    #[serde(rename = "version")]
    config_version: u16,
    system_name: String,
    id: String,
    command: String,
    #[serde(default)]
    prepare_command: Option<String>,
    data_path: String,
    timeout: Timeout,
    core: CoreInfo,
    bus: BusConfig,
    #[serde(default)]
    config: Option<Value>,
    #[serde(default = "default_workers")]
    workers: u32,
    #[serde(default)]
    user: Option<String>,
    #[serde(default)]
    react_to_fail: bool,
    #[serde(
        serialize_with = "crate::tools::serialize_atomic_bool",
        deserialize_with = "crate::tools::deserialize_atomic_bool"
    )]
    fail_mode: atomic::AtomicBool,
    #[serde(default)]
    fips: bool,
    #[serde(default)]
    call_tracing: bool,
}

impl Initial {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        id: &str,
        system_name: &str,
        command: &str,
        prepare_command: Option<&str>,
        data_path: &str,
        timeout: &Timeout,
        core_info: CoreInfo,
        bus: BusConfig,
        config: Option<&Value>,
        workers: u32,
        user: Option<&str>,
        react_to_fail: bool,
        fips: bool,
        call_tracing: bool,
    ) -> Self {
        Self {
            config_version: SERVICE_CONFIG_VERSION,
            system_name: system_name.to_owned(),
            id: id.to_owned(),
            command: command.to_owned(),
            prepare_command: prepare_command.map(ToOwned::to_owned),
            data_path: data_path.to_owned(),
            timeout: timeout.clone(),
            core: core_info,
            bus,
            config: config.map(Clone::clone),
            workers,
            user: user.map(ToOwned::to_owned),
            react_to_fail,
            fail_mode: atomic::AtomicBool::new(false),
            fips,
            call_tracing,
        }
    }
    #[inline]
    pub fn init(&self) -> EResult<()> {
        if self.fips {
            openssl::fips::enable(true)?;
        }
        Ok(())
    }
    #[inline]
    pub fn config_version(&self) -> u16 {
        self.config_version
    }
    #[inline]
    pub fn system_name(&self) -> &str {
        &self.system_name
    }
    #[inline]
    pub fn id(&self) -> &str {
        &self.id
    }
    #[inline]
    pub fn command(&self) -> &str {
        &self.command
    }
    #[inline]
    pub fn prepare_command(&self) -> Option<&str> {
        self.prepare_command.as_deref()
    }
    #[inline]
    pub fn user(&self) -> Option<&str> {
        self.user.as_deref()
    }
    #[inline]
    pub fn data_path(&self) -> Option<&str> {
        if let Some(ref user) = self.user {
            if user == "nobody" {
                return None;
            }
        }
        Some(&self.data_path)
    }
    #[inline]
    pub fn planned_data_path(&self) -> &str {
        &self.data_path
    }
    #[inline]
    pub fn timeout(&self) -> Duration {
        self.timeout
            .default
            .map_or(crate::DEFAULT_TIMEOUT, Duration::from_secs_f64)
    }
    #[inline]
    pub fn startup_timeout(&self) -> Duration {
        self.timeout
            .startup
            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
    }
    #[inline]
    pub fn shutdown_timeout(&self) -> Duration {
        self.timeout
            .shutdown
            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
    }
    #[inline]
    pub fn bus_timeout(&self) -> Duration {
        self.bus
            .timeout
            .map_or_else(|| self.timeout(), Duration::from_secs_f64)
    }
    #[inline]
    pub fn eva_build(&self) -> u64 {
        self.core.build
    }
    #[inline]
    pub fn eva_version(&self) -> &str {
        &self.core.version
    }
    #[inline]
    pub fn eapi_version(&self) -> u16 {
        self.core.eapi_verion
    }
    #[inline]
    pub fn eva_dir(&self) -> &str {
        &self.core.path
    }
    #[inline]
    pub fn eva_log_level(&self) -> u8 {
        self.core.log_level
    }
    #[inline]
    pub fn core_active(&self) -> bool {
        self.core.active
    }
    #[inline]
    pub fn call_tracing(&self) -> bool {
        self.call_tracing
    }
    #[inline]
    pub fn eva_log_level_filter(&self) -> log::LevelFilter {
        match self.core.log_level {
            crate::LOG_LEVEL_TRACE => log::LevelFilter::Trace,
            crate::LOG_LEVEL_DEBUG => log::LevelFilter::Debug,
            crate::LOG_LEVEL_WARN => log::LevelFilter::Warn,
            crate::LOG_LEVEL_ERROR => log::LevelFilter::Error,
            _ => log::LevelFilter::Info,
        }
    }
    #[inline]
    pub fn bus_config(&self) -> EResult<busrt::ipc::Config> {
        if self.bus.tp == "native" {
            Ok(busrt::ipc::Config::new(&self.bus.path, &self.id)
                .buf_size(self.bus.buf_size)
                .buf_ttl(Duration::from_micros(self.bus.buf_ttl))
                .queue_size(self.bus.queue_size)
                .timeout(self.bus_timeout()))
        } else {
            Err(Error::not_implemented(format!(
                "bus type {} is not supported",
                self.bus.tp
            )))
        }
    }
    #[inline]
    pub fn bus_path(&self) -> &str {
        &self.bus.path
    }
    #[inline]
    pub fn config(&self) -> Option<&Value> {
        self.config.as_ref()
    }
    #[cfg(feature = "extended-value")]
    #[inline]
    pub async fn extend_config(&mut self, timeout: Duration, base: &Path) -> EResult<()> {
        self.config = if let Some(config) = self.config.take() {
            Some(config.extend(timeout, base).await?)
        } else {
            None
        };
        Ok(())
    }
    #[inline]
    pub fn workers(&self) -> u32 {
        self.workers
    }
    #[inline]
    pub fn bus_queue_size(&self) -> usize {
        self.bus.queue_size
    }
    #[inline]
    pub fn take_config(&mut self) -> Option<Value> {
        self.config.take()
    }
    #[inline]
    pub async fn init_rpc<R>(&self, handlers: R) -> EResult<Arc<RpcClient>>
    where
        R: RpcHandlers + Send + Sync + 'static,
    {
        self.init_rpc_opts(handlers, rpc::Options::default()).await
    }
    #[inline]
    pub async fn init_rpc_blocking<R>(&self, handlers: R) -> EResult<Arc<RpcClient>>
    where
        R: RpcHandlers + Send + Sync + 'static,
    {
        self.init_rpc_opts(
            handlers,
            rpc::Options::new()
                .blocking_notifications()
                .blocking_frames(),
        )
        .await
    }
    #[inline]
    pub async fn init_rpc_blocking_with_secondary<R>(
        &self,
        handlers: R,
    ) -> EResult<(Arc<RpcClient>, Arc<RpcClient>)>
    where
        R: RpcHandlers + Send + Sync + 'static,
    {
        let bus = self.init_bus_client().await?;
        let bus_secondary = bus.register_secondary().await?;
        let opts = rpc::Options::new()
            .blocking_notifications()
            .blocking_frames();
        let rpc = Arc::new(RpcClient::create(bus, handlers, opts.clone()));
        let rpc_secondary = Arc::new(RpcClient::create0(bus_secondary, opts));
        Ok((rpc, rpc_secondary))
    }
    pub async fn init_rpc_opts<R>(&self, handlers: R, opts: rpc::Options) -> EResult<Arc<RpcClient>>
    where
        R: RpcHandlers + Send + Sync + 'static,
    {
        let bus = self.init_bus_client().await?;
        let rpc = RpcClient::create(bus, handlers, opts);
        Ok(Arc::new(rpc))
    }
    pub async fn init_bus_client(&self) -> EResult<busrt::ipc::Client> {
        let bus = tokio::time::timeout(
            self.bus_timeout(),
            busrt::ipc::Client::connect(&self.bus_config()?),
        )
        .await??;
        Ok(bus)
    }
    #[inline]
    pub fn init_registry(&self, rpc: &Arc<RpcClient>) -> Registry {
        Registry {
            id: self.id.clone(),
            rpc: rpc.clone(),
        }
    }
    #[inline]
    pub fn can_rtf(&self) -> bool {
        self.react_to_fail
    }
    #[inline]
    pub fn is_mode_normal(&self) -> bool {
        !self.fail_mode.load(atomic::Ordering::SeqCst)
    }
    #[inline]
    pub fn is_mode_rtf(&self) -> bool {
        self.fail_mode.load(atomic::Ordering::SeqCst)
    }
    #[inline]
    pub fn set_fail_mode(&self, mode: bool) {
        self.fail_mode.store(mode, atomic::Ordering::SeqCst);
    }
    #[inline]
    pub fn drop_privileges(&self) -> EResult<()> {
        if let Some(ref user) = self.user {
            let u = get_system_user(user)?;
            if nix::unistd::getuid() != u.uid {
                let c_user = CString::new(user.as_str())
                    .map_err(|e| Error::failed(format!("Failed to parse user {}: {}", user, e)))?;

                let groups = nix::unistd::getgrouplist(&c_user, u.gid).map_err(|e| {
                    Error::failed(format!("Failed to get groups for user {}: {}", user, e))
                })?;
                nix::unistd::setgroups(&groups).map_err(|e| {
                    Error::failed(format!(
                        "Failed to switch the process groups for user {}: {}",
                        user, e
                    ))
                })?;
                nix::unistd::setgid(u.gid).map_err(|e| {
                    Error::failed(format!(
                        "Failed to switch the process group for user {}: {}",
                        user, e
                    ))
                })?;
                nix::unistd::setuid(u.uid).map_err(|e| {
                    Error::failed(format!(
                        "Failed to switch the process user to {}: {}",
                        user, e
                    ))
                })?;
            }
        }
        Ok(())
    }
}

pub fn get_system_user(user: &str) -> EResult<nix::unistd::User> {
    let u = nix::unistd::User::from_name(user)
        .map_err(|e| Error::failed(format!("failed to get the system user {}: {}", user, e)))?
        .ok_or_else(|| Error::failed(format!("Failed to locate the system user {}", user)))?;
    Ok(u)
}

pub fn get_system_group(group: &str) -> EResult<nix::unistd::Group> {
    let g = nix::unistd::Group::from_name(group)
        .map_err(|e| Error::failed(format!("failed to get the system group {}: {}", group, e)))?
        .ok_or_else(|| Error::failed(format!("Failed to locate the system group {}", group)))?;
    Ok(g)
}

#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct Timeout {
    startup: Option<f64>,
    shutdown: Option<f64>,
    default: Option<f64>,
}

impl Timeout {
    pub fn offer(&mut self, timeout: f64) {
        if self.startup.is_none() {
            self.startup.replace(timeout);
        }
        if self.shutdown.is_none() {
            self.shutdown.replace(timeout);
        }
        if self.default.is_none() {
            self.default.replace(timeout);
        }
    }
    pub fn get(&self) -> Option<Duration> {
        self.default.map(Duration::from_secs_f64)
    }
    pub fn startup(&self) -> Option<Duration> {
        self.startup.map(Duration::from_secs_f64)
    }
    pub fn shutdown(&self) -> Option<Duration> {
        self.shutdown.map(Duration::from_secs_f64)
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CoreInfo {
    build: u64,
    version: String,
    eapi_verion: u16,
    path: String,
    log_level: u8,
    active: bool,
}

impl CoreInfo {
    pub fn new(
        build: u64,
        version: &str,
        eapi_verion: u16,
        path: &str,
        log_level: u8,
        active: bool,
    ) -> Self {
        Self {
            build,
            version: version.to_owned(),
            eapi_verion,
            path: path.to_owned(),
            log_level,
            active,
        }
    }
}

#[inline]
fn default_bus_type() -> String {
    "native".to_owned()
}

#[inline]
fn default_bus_buf_size() -> usize {
    busrt::DEFAULT_BUF_SIZE
}

#[allow(clippy::cast_possible_truncation)]
#[inline]
fn default_bus_buf_ttl() -> u64 {
    busrt::DEFAULT_BUF_TTL.as_micros() as u64
}

#[inline]
fn default_bus_queue_size() -> usize {
    busrt::DEFAULT_QUEUE_SIZE
}

#[inline]
fn default_bus_ping_interval() -> f64 {
    1.0
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BusConfig {
    #[serde(rename = "type", default = "default_bus_type")]
    tp: String,
    path: String,
    timeout: Option<f64>,
    #[serde(default = "default_bus_buf_size")]
    buf_size: usize,
    #[serde(default = "default_bus_buf_ttl")]
    buf_ttl: u64, // microseconds
    #[serde(default = "default_bus_queue_size")]
    queue_size: usize,
    #[serde(default = "default_bus_ping_interval")]
    ping_interval: f64,
}

impl BusConfig {
    pub fn path(&self) -> &str {
        &self.path
    }
    pub fn set_path(&mut self, path: &str) {
        self.path = path.to_owned();
    }
    pub fn offer_timeout(&mut self, timeout: f64) {
        if self.timeout.is_none() {
            self.timeout.replace(timeout);
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MethodParamInfo {
    #[serde(default)]
    required: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MethodInfo {
    #[serde(default)]
    description: String,
    params: HashMap<String, MethodParamInfo>,
}

/// info-structure only, can be used by clients for auto-completion
pub struct ServiceMethod {
    name: String,
    description: String,
    params: HashMap<String, MethodParamInfo>,
}

impl ServiceMethod {
    pub fn new(name: &str) -> Self {
        Self {
            name: name.to_owned(),
            description: String::new(),
            params: <_>::default(),
        }
    }
    pub fn description(mut self, desc: &str) -> Self {
        self.description = desc.to_owned();
        self
    }
    pub fn required(mut self, name: &str) -> Self {
        self.params
            .insert(name.to_owned(), MethodParamInfo { required: true });
        self
    }
    pub fn optional(mut self, name: &str) -> Self {
        self.params
            .insert(name.to_owned(), MethodParamInfo { required: false });
        self
    }
}

/// Returned by all services on "info" RPC command
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServiceInfo {
    #[serde(default)]
    author: String,
    #[serde(default)]
    version: String,
    #[serde(default)]
    description: String,
    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    methods: HashMap<String, MethodInfo>,
}

impl ServiceInfo {
    pub fn new(author: &str, version: &str, description: &str) -> Self {
        Self {
            author: author.to_owned(),
            version: version.to_owned(),
            description: description.to_owned(),
            methods: <_>::default(),
        }
    }
    #[inline]
    pub fn add_method(&mut self, method: ServiceMethod) {
        self.methods.insert(
            method.name,
            MethodInfo {
                description: method.description,
                params: method.params,
            },
        );
    }
}

/// Used by services to announce their status (for "*")
#[derive(Serialize, Deserialize)]
pub struct ServiceStatusBroadcastEvent {
    pub status: ServiceStatusBroadcast,
}

impl ServiceStatusBroadcastEvent {
    #[inline]
    pub fn ready() -> Self {
        Self {
            status: ServiceStatusBroadcast::Ready,
        }
    }
    #[inline]
    pub fn terminating() -> Self {
        Self {
            status: ServiceStatusBroadcast::Terminating,
        }
    }
}

/// Used by services and the core to notify about its state
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[repr(u8)]
pub enum ServiceStatusBroadcast {
    Starting = 0,
    Ready = 1,
    Terminating = 0xef,
    Unknown = 0xff,
}

impl fmt::Display for ServiceStatusBroadcast {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{}",
            match self {
                ServiceStatusBroadcast::Starting => "starting",
                ServiceStatusBroadcast::Ready => "ready",
                ServiceStatusBroadcast::Terminating => "terminating",
                ServiceStatusBroadcast::Unknown => "unknown",
            }
        )
    }
}