eva_common/
registry.rs

1use crate::err_logger;
2use crate::payload::{pack, unpack};
3use crate::prelude::*;
4use busrt::rpc::{Rpc, RpcClient};
5use busrt::QoS;
6use serde::{Deserialize, Serialize};
7
8err_logger!();
9
10pub const GLOBAL_KEY_PREFIX: &str = "eva";
11pub const SERVICE_NAME: &str = "eva.registry";
12
13pub const R_INVENTORY: &str = "inventory";
14pub const R_STATE: &str = "state";
15pub const R_SERVICE: &str = "svc";
16pub const R_SERVICE_DATA: &str = "svc_data";
17pub const R_USER_DATA: &str = "user_data";
18pub const R_CONFIG: &str = "config";
19pub const R_DATA: &str = "data";
20pub const R_CACHE: &str = "cache";
21pub const R_DATA_OBJECT: &str = "dobj";
22
23// the below methods are pub as the core access the registry directly as db during startup
24#[inline]
25pub fn format_top_key(key: &str) -> String {
26    format!("{}/{}", GLOBAL_KEY_PREFIX, key)
27}
28
29#[inline]
30pub fn format_key(prefix: &str, key: &str) -> String {
31    format!("{}/{}/{}", GLOBAL_KEY_PREFIX, prefix, key)
32}
33
34#[inline]
35pub fn format_config_key(key: &str) -> String {
36    format!("{}/{}/{}", GLOBAL_KEY_PREFIX, R_CONFIG, key)
37}
38
39#[inline]
40pub fn format_data_key(key: &str) -> String {
41    format!("{}/{}/{}", GLOBAL_KEY_PREFIX, R_DATA, key)
42}
43
44#[inline]
45pub fn format_svc_data_key(key: &str) -> String {
46    format!("{}/{}/{}", GLOBAL_KEY_PREFIX, R_SERVICE_DATA, key)
47}
48
49#[inline]
50pub fn format_svc_data_subkey(key: &str) -> String {
51    format!("{}/{}", R_SERVICE_DATA, key)
52}
53
54#[inline]
55async fn call<P>(method: &str, payload: P, rpc: &RpcClient) -> EResult<Value>
56where
57    P: Serialize,
58{
59    let result = rpc
60        .call(
61            SERVICE_NAME,
62            method,
63            pack(&payload)
64                .log_err_with("unable to pack registry call payload")?
65                .into(),
66            QoS::Processed,
67        )
68        .await
69        .map_err(|e| {
70            Error::registry(std::str::from_utf8(e.data().unwrap_or(&[])).unwrap_or_default())
71        })?;
72    unpack(result.payload())
73}
74
75#[derive(Serialize)]
76struct PayloadKeySet {
77    key: String,
78    value: Value,
79}
80
81#[derive(Serialize)]
82struct PayloadKey {
83    key: String,
84}
85
86#[inline]
87pub async fn key_set<V>(prefix: &str, key: &str, value: V, rpc: &RpcClient) -> EResult<Value>
88where
89    V: Serialize,
90{
91    let payload = PayloadKeySet {
92        key: format_key(prefix, key),
93        value: to_value(value)?,
94    };
95    call("key_set", payload, rpc).await
96}
97
98#[inline]
99pub async fn key_get(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
100    let payload = PayloadKey {
101        key: format_key(prefix, key),
102    };
103    call("key_get", payload, rpc).await
104}
105
106#[inline]
107pub async fn key_increment(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<i64> {
108    let payload = PayloadKey {
109        key: format_key(prefix, key),
110    };
111    TryInto::<i64>::try_into(call("key_increment", payload, rpc).await?).map_err(Into::into)
112}
113
114#[inline]
115pub async fn key_decrement(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<i64> {
116    let payload = PayloadKey {
117        key: format_key(prefix, key),
118    };
119    TryInto::<i64>::try_into(call("key_decrement", payload, rpc).await?).map_err(Into::into)
120}
121
122#[inline]
123pub async fn key_get_recursive(
124    prefix: &str,
125    key: &str,
126    rpc: &RpcClient,
127) -> EResult<Vec<(String, Value)>> {
128    let payload = PayloadKey {
129        key: format_key(prefix, key),
130    };
131    let key_len = payload.key.len() + 1;
132    let val = call("key_get_recursive", payload, rpc).await?;
133    let res: Vec<(String, Value)> = Vec::deserialize(val)?;
134    let mut result: Vec<(String, Value)> = Vec::new();
135    for (k, v) in res {
136        if k.len() < key_len {
137            return Err(Error::invalid_data(format!(
138                "invalid key name returned by the registry: {}",
139                k
140            )));
141        }
142        result.push((k[key_len..].to_string(), v));
143    }
144    Ok(result)
145}
146
147#[inline]
148pub async fn key_delete(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
149    let payload = PayloadKey {
150        key: format_key(prefix, key),
151    };
152    call("key_delete", payload, rpc).await
153}
154
155#[inline]
156pub async fn key_delete_recursive(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
157    let payload = PayloadKey {
158        key: format_key(prefix, key),
159    };
160    call("key_delete_recursive", payload, rpc).await
161}