1use crate::err_logger;
2use crate::payload::{pack, unpack};
3use crate::prelude::*;
4use busrt::rpc::{Rpc, RpcClient};
5use busrt::QoS;
6use serde::{Deserialize, Serialize};
7
8err_logger!();
9
10pub const GLOBAL_KEY_PREFIX: &str = "eva";
11pub const SERVICE_NAME: &str = "eva.registry";
12
13pub const R_INVENTORY: &str = "inventory";
14pub const R_STATE: &str = "state";
15pub const R_SERVICE: &str = "svc";
16pub const R_SERVICE_DATA: &str = "svc_data";
17pub const R_USER_DATA: &str = "user_data";
18pub const R_CONFIG: &str = "config";
19pub const R_DATA: &str = "data";
20pub const R_CACHE: &str = "cache";
21pub const R_DATA_OBJECT: &str = "dobj";
22
23#[inline]
25pub fn format_top_key(key: &str) -> String {
26 format!("{}/{}", GLOBAL_KEY_PREFIX, key)
27}
28
29#[inline]
30pub fn format_key(prefix: &str, key: &str) -> String {
31 format!("{}/{}/{}", GLOBAL_KEY_PREFIX, prefix, key)
32}
33
34#[inline]
35pub fn format_config_key(key: &str) -> String {
36 format!("{}/{}/{}", GLOBAL_KEY_PREFIX, R_CONFIG, key)
37}
38
39#[inline]
40pub fn format_data_key(key: &str) -> String {
41 format!("{}/{}/{}", GLOBAL_KEY_PREFIX, R_DATA, key)
42}
43
44#[inline]
45pub fn format_svc_data_key(key: &str) -> String {
46 format!("{}/{}/{}", GLOBAL_KEY_PREFIX, R_SERVICE_DATA, key)
47}
48
49#[inline]
50pub fn format_svc_data_subkey(key: &str) -> String {
51 format!("{}/{}", R_SERVICE_DATA, key)
52}
53
54#[inline]
55async fn call<P>(method: &str, payload: P, rpc: &RpcClient) -> EResult<Value>
56where
57 P: Serialize,
58{
59 let result = rpc
60 .call(
61 SERVICE_NAME,
62 method,
63 pack(&payload)
64 .log_err_with("unable to pack registry call payload")?
65 .into(),
66 QoS::Processed,
67 )
68 .await
69 .map_err(|e| {
70 Error::registry(std::str::from_utf8(e.data().unwrap_or(&[])).unwrap_or_default())
71 })?;
72 unpack(result.payload())
73}
74
75#[derive(Serialize)]
76struct PayloadKeySet {
77 key: String,
78 value: Value,
79}
80
81#[derive(Serialize)]
82struct PayloadKey {
83 key: String,
84}
85
86#[inline]
87pub async fn key_set<V>(prefix: &str, key: &str, value: V, rpc: &RpcClient) -> EResult<Value>
88where
89 V: Serialize,
90{
91 let payload = PayloadKeySet {
92 key: format_key(prefix, key),
93 value: to_value(value)?,
94 };
95 call("key_set", payload, rpc).await
96}
97
98#[inline]
99pub async fn key_get(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
100 let payload = PayloadKey {
101 key: format_key(prefix, key),
102 };
103 call("key_get", payload, rpc).await
104}
105
106#[inline]
107pub async fn key_increment(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<i64> {
108 let payload = PayloadKey {
109 key: format_key(prefix, key),
110 };
111 TryInto::<i64>::try_into(call("key_increment", payload, rpc).await?).map_err(Into::into)
112}
113
114#[inline]
115pub async fn key_decrement(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<i64> {
116 let payload = PayloadKey {
117 key: format_key(prefix, key),
118 };
119 TryInto::<i64>::try_into(call("key_decrement", payload, rpc).await?).map_err(Into::into)
120}
121
122#[inline]
123pub async fn key_get_recursive(
124 prefix: &str,
125 key: &str,
126 rpc: &RpcClient,
127) -> EResult<Vec<(String, Value)>> {
128 let payload = PayloadKey {
129 key: format_key(prefix, key),
130 };
131 let key_len = payload.key.len() + 1;
132 let val = call("key_get_recursive", payload, rpc).await?;
133 let res: Vec<(String, Value)> = Vec::deserialize(val)?;
134 let mut result: Vec<(String, Value)> = Vec::new();
135 for (k, v) in res {
136 if k.len() < key_len {
137 return Err(Error::invalid_data(format!(
138 "invalid key name returned by the registry: {}",
139 k
140 )));
141 }
142 result.push((k[key_len..].to_string(), v));
143 }
144 Ok(result)
145}
146
147#[inline]
148pub async fn key_delete(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
149 let payload = PayloadKey {
150 key: format_key(prefix, key),
151 };
152 call("key_delete", payload, rpc).await
153}
154
155#[inline]
156pub async fn key_delete_recursive(prefix: &str, key: &str, rpc: &RpcClient) -> EResult<Value> {
157 let payload = PayloadKey {
158 key: format_key(prefix, key),
159 };
160 call("key_delete_recursive", payload, rpc).await
161}