use crate::tools::ErrLogger;
use crate::{EResult, Error};
use elbus::rpc::{Rpc, RpcClient};
use elbus::QoS;
use eva_common::prelude::*;
use log::debug;
use serde::{Deserialize, Serialize};
pub const GLOBAL_KEY_PREFIX: &str = "eva";
pub const SERVICE_NAME: &str = "eva.registry";
pub const R_INVENTORY: &str = "inventory";
pub const R_STATE: &str = "state";
pub const R_SERVICE: &str = "svc";
pub const R_SERVICE_DATA: &str = "svc_data";
pub const R_CONFIG: &str = "config";
pub const R_DATA: &str = "data";
pub const R_CACHE: &str = "cache";
#[inline]
pub fn format_top_key(key: &str) -> String {
format!("{}/{}", GLOBAL_KEY_PREFIX, key)
}
#[inline]
pub fn format_key(prefix: &str, key: &str) -> String {
format!("{}/{}/{}", GLOBAL_KEY_PREFIX, prefix, key)
}
#[inline]
pub fn format_config_key(key: &str) -> String {
format!("{}/config/{}", GLOBAL_KEY_PREFIX, key)
}
#[inline]
pub fn format_data_key(key: &str) -> String {
format!("{}/data/{}", GLOBAL_KEY_PREFIX, key)
}
#[inline]
async fn call<P>(method: &str, payload: P, rpc: &RpcClient) -> EResult<Value>
where
P: Serialize,
{
let result = rpc
.call(
SERVICE_NAME,
method,
rmp_serde::to_vec_named(&payload).log_err()?.into(),
QoS::Processed,
)
.await
.map_err(|e| {
Error::registry(std::str::from_utf8(e.data().unwrap_or(&[])).unwrap_or_default())
})?;
rmp_serde::from_read_ref(result.payload()).map_err(Into::into)
}
#[derive(Serialize)]
struct PayloadKeySet {
key: String,
value: Value,
}
#[derive(Serialize)]
struct PayloadKey {
key: String,
}
#[inline]
pub async fn key_set<V>(prefix: &str, key: &str, value: V, rpc: &RpcClient) -> EResult<Value>
where
V: Serialize,
{
let payload = PayloadKeySet {
key: format_key(prefix, key),
value: to_value(value)?,
};
call("key_set", payload, rpc).await
}
#[inline]
pub async fn key_get(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
let payload = PayloadKey {
key: format_key(prefix, key),
};
call("key_get", payload, rpc).await
}
#[inline]
pub async fn key_increment(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<i64> {
let payload = PayloadKey {
key: format_key(prefix, key),
};
TryInto::<i64>::try_into(call("key_increment", payload, rpc).await?).map_err(Into::into)
}
#[inline]
pub async fn key_decrement(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<i64> {
let payload = PayloadKey {
key: format_key(prefix, key),
};
TryInto::<i64>::try_into(call("key_decrement", payload, rpc).await?).map_err(Into::into)
}
#[inline]
pub async fn key_get_recursive(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Vec<Value>> {
let payload = PayloadKey {
key: format_key(prefix, key),
};
call("key_get_recursive", payload, rpc)
.await?
.try_into()
.map_err(Into::into)
}
#[inline]
pub async fn key_delete(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
let payload = PayloadKey {
key: format_key(prefix, key),
};
call("key_delete", payload, rpc).await
}
#[inline]
pub async fn key_delete_recursive(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
let payload = PayloadKey {
key: format_key(prefix, key),
};
call("key_delete_recursive", payload, rpc).await
}
#[derive(Deserialize)]
struct RegistryConfig {
auto_bak: u64,
auto_flush: bool,
cache_size: usize,
skip_bak: Vec<String>,
strict_schema: bool,
}
pub fn load(db: &mut yedb::Database) -> EResult<()> {
let registry_config: RegistryConfig =
serde_json::from_value(db.key_get(&format_config_key("registry"))?)?;
debug!("registry.auto_bak = {}", registry_config.auto_bak);
db.auto_bak = registry_config.auto_bak;
debug!("registry.auto_flush = {}", registry_config.auto_flush);
db.auto_flush = registry_config.auto_flush;
debug!("registry.cache_size = {:?}", registry_config.cache_size);
db.set_cache_size(registry_config.cache_size);
debug!("registry.skip_bak = {}", registry_config.skip_bak.join(","));
db.skip_bak = registry_config.skip_bak;
debug!("registry.strict_schema = {}", registry_config.strict_schema);
db.strict_schema = registry_config.strict_schema;
Ok(())
}