eva_client/
lib.rs

1use busrt::{
2    QoS, ipc,
3    rpc::{self, Rpc},
4};
5use eva_common::events::NodeInfo;
6use eva_common::payload::{pack, unpack};
7use eva_common::prelude::*;
8use hyper::{Body, Method, Request, client::HttpConnector};
9use hyper_tls::HttpsConnector;
10use rjrpc::{JsonRpcRequest, JsonRpcResponse};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::atomic;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18pub type NodeMap = BTreeMap<String, String>;
19
20pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
21
22static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
23const CT_HEADER: &str = "application/msgpack";
24
25pub const LOCAL: &str = ".local";
26
27#[derive(Deserialize)]
28pub struct SystemInfo {
29    pub system_name: String,
30    pub active: bool,
31    #[serde(flatten)]
32    pub ver: VersionInfo,
33}
34
35#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
36pub struct VersionInfo {
37    pub build: u64,
38    pub version: String,
39}
40
41impl fmt::Display for VersionInfo {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        write!(f, "{} ({})", self.build, self.version)
44    }
45}
46
47impl From<NodeInfo> for VersionInfo {
48    fn from(ni: NodeInfo) -> Self {
49        Self {
50            build: ni.build,
51            version: ni.version,
52        }
53    }
54}
55
56#[inline]
57fn parse_major(ver: &str) -> EResult<u16> {
58    ver.split('.').next().unwrap().parse().map_err(Into::into)
59}
60
61impl VersionInfo {
62    #[inline]
63    pub fn major_matches(&self, ver: &str) -> EResult<bool> {
64        Ok(parse_major(ver)? == self.major()?)
65    }
66    #[inline]
67    pub fn major(&self) -> EResult<u16> {
68        parse_major(&self.version)
69    }
70}
71
72#[allow(clippy::module_name_repetitions)]
73pub struct EvaCloudClient {
74    system_name: String,
75    client: Arc<EvaClient>,
76    node_map: NodeMap,
77}
78
79impl EvaCloudClient {
80    /// Create cloud client from a local
81    pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
82        Self {
83            system_name: system_name.to_owned(),
84            client: Arc::new(client),
85            node_map,
86        }
87    }
88    /// Create cloud client from scratch
89    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
90        let local_client = EvaClient::connect(path, base_name, config).await?;
91        local_client.transform_into_cloud_client().await
92    }
93    pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
94        let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
95        Ok(info)
96    }
97    pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
98    where
99        T: DeserializeOwned,
100    {
101        self.rpc_call(LOCAL, target, method, None).await
102    }
103    pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
104    where
105        T: DeserializeOwned,
106        V: Serialize,
107    {
108        self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
109            .await
110    }
111    pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
112    where
113        T: DeserializeOwned,
114    {
115        self.rpc_call(node, target, method, None).await
116    }
117    pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
118    where
119        T: DeserializeOwned,
120        V: Serialize,
121    {
122        self.rpc_call(node, target, method, Some(to_value(params)?))
123            .await
124    }
125    pub async fn rpc_call<T>(
126        &self,
127        node: &str,
128        target: &str,
129        method: &str,
130        params: Option<Value>,
131    ) -> EResult<T>
132    where
133        T: DeserializeOwned,
134    {
135        if node == ".local" || node == self.system_name {
136            self.client.call(target, method, params).await
137        } else {
138            let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
139                BTreeMap::deserialize(p).map_err(Error::invalid_data)?
140            } else {
141                BTreeMap::new()
142            };
143            repl_params.insert(
144                Value::String("node".to_owned()),
145                Value::String(node.to_owned()),
146            );
147            self.client
148                .call(
149                    self.node_map.get(node).ok_or_else(|| {
150                        Error::failed(format!("no replication service mapped for {}", node))
151                    })?,
152                    &format!("bus::{}::{}", target, method),
153                    Some(to_value(repl_params)?),
154                )
155                .await
156        }
157    }
158    #[inline]
159    pub fn client(&self) -> &EvaClient {
160        &self.client
161    }
162    #[inline]
163    pub fn client_cloned(&self) -> Arc<EvaClient> {
164        self.client.clone()
165    }
166}
167
168#[derive(Debug, Clone)]
169pub struct Config {
170    credentials: Option<(String, String)>,
171    token: Option<String>,
172    timeout: Duration,
173}
174
175impl Config {
176    #[inline]
177    pub fn new() -> Self {
178        Self::default()
179    }
180    /// Set API key/token
181    #[inline]
182    pub fn token(mut self, token: &str) -> Self {
183        self.token = Some(token.to_owned());
184        self
185    }
186    /// perform HTTP login with credentials
187    #[inline]
188    pub fn credentials(mut self, login: &str, password: &str) -> Self {
189        self.credentials = Some((login.to_owned(), password.to_owned()));
190        self
191    }
192    #[inline]
193    pub fn timeout(mut self, timeout: Duration) -> Self {
194        self.timeout = timeout;
195        self
196    }
197}
198
199impl Default for Config {
200    #[inline]
201    fn default() -> Self {
202        Self {
203            credentials: None,
204            token: None,
205            timeout: eva_common::DEFAULT_TIMEOUT,
206        }
207    }
208}
209
210#[allow(clippy::module_name_repetitions)]
211pub struct EvaClient {
212    name: String,
213    client: ClientKind,
214    config: Config,
215    token: Mutex<Option<Arc<String>>>,
216    token_preassigned: bool,
217    path: String,
218    request_id: atomic::AtomicU32,
219}
220
221impl EvaClient {
222    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
223        if path.starts_with("http://") || path.starts_with("https://") {
224            let https = HttpsConnector::new();
225            let http_client: hyper::Client<_> = hyper::Client::builder()
226                .pool_idle_timeout(config.timeout)
227                .build(https);
228            let token = config.token.clone();
229            let has_token = token.is_some();
230            let cl = Self {
231                name: base_name.to_owned(),
232                client: ClientKind::Http(http_client),
233                config,
234                token: Mutex::new(token.map(Arc::new)),
235                token_preassigned: has_token,
236                path: path.to_owned(),
237                request_id: atomic::AtomicU32::new(0),
238            };
239            if !has_token && let ClientKind::Http(ref client) = cl.client {
240                cl.http_login(client).await?;
241            }
242            Ok(cl)
243        } else {
244            let name = format!(
245                "{}.{}.{}",
246                base_name,
247                std::process::id(),
248                CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
249            );
250            let bus = ipc::Client::connect(&ipc::Config::new(path, &name).timeout(config.timeout))
251                .await?;
252            let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
253            Ok(Self {
254                name,
255                client: ClientKind::Bus(rpc),
256                config,
257                token: <_>::default(),
258                token_preassigned: false,
259                path: path.to_owned(),
260                request_id: atomic::AtomicU32::new(0),
261            })
262        }
263    }
264    #[inline]
265    pub fn name(&self) -> &str {
266        &self.name
267    }
268    pub async fn get_system_info(&self) -> EResult<SystemInfo> {
269        let info: SystemInfo = self.call0("eva.core", "test").await?;
270        Ok(info)
271    }
272    async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
273        #[derive(Serialize)]
274        struct LoginParams<'a> {
275            u: &'a str,
276            p: &'a str,
277        }
278        #[derive(Deserialize)]
279        struct LoginPayload {
280            token: String,
281        }
282        if let Some(ref creds) = self.config.credentials {
283            let p: LoginPayload = self
284                .safe_http_call(
285                    client,
286                    None,
287                    None,
288                    "login",
289                    Some(to_value(LoginParams {
290                        u: &creds.0,
291                        p: &creds.1,
292                    })?),
293                )
294                .await?;
295            let token = Arc::new(p.token);
296            self.token.lock().unwrap().replace(token.clone());
297            Ok(token)
298        } else {
299            Err(Error::access("no credentials set"))
300        }
301    }
302    pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
303        #[derive(Deserialize)]
304        struct NodeList {
305            name: String,
306            svc: Option<String>,
307        }
308        let system_name = self.get_system_info().await?.system_name;
309        let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
310        let node_map: NodeMap = node_list
311            .into_iter()
312            .filter_map(|v| v.svc.map(|s| (v.name, s)))
313            .collect();
314        Ok(EvaCloudClient::from_eva_client(
315            &system_name,
316            self,
317            node_map,
318        ))
319    }
320    pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
321    where
322        T: DeserializeOwned,
323    {
324        self.rpc_call(target, method, None::<()>).await
325    }
326    pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
327    where
328        T: DeserializeOwned,
329        V: Serialize,
330    {
331        self.rpc_call(target, method, Some(params)).await
332    }
333    /// # Panics
334    ///
335    /// Will panic if token mutex is poisoned
336    pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
337    where
338        T: DeserializeOwned,
339        V: Serialize,
340    {
341        match self.client {
342            ClientKind::Bus(ref c) => {
343                let payload: busrt::borrow::Cow = if let Some(ref p) = params {
344                    pack(p)?.into()
345                } else {
346                    busrt::empty_payload!()
347                };
348                let res = tokio::time::timeout(
349                    self.config.timeout,
350                    c.call(target, method, payload, QoS::Processed),
351                )
352                .await??;
353                let result = res.payload();
354                if result.is_empty() {
355                    Ok(T::deserialize(Value::Unit)?)
356                } else {
357                    Ok(unpack(result)?)
358                }
359            }
360            ClientKind::Http(ref client) => {
361                let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
362                let params_payload = if let Some(p) = params {
363                    Some(to_value(p)?)
364                } else {
365                    None
366                };
367                if let Some(token) = to {
368                    match self
369                        .safe_http_call(
370                            client,
371                            Some(&token),
372                            Some(target),
373                            method,
374                            params_payload.clone(),
375                        )
376                        .await
377                    {
378                        Err(e)
379                            if !self.token_preassigned
380                                && e.kind() == ErrorKind::AccessDenied
381                                && (e.message() == Some("invalid token")) =>
382                        {
383                            // repeat request with new token
384                            let token = self.http_login(client).await?;
385                            self.safe_http_call(
386                                client,
387                                Some(&token),
388                                Some(target),
389                                method,
390                                params_payload,
391                            )
392                            .await
393                        }
394                        res => res,
395                    }
396                } else {
397                    let token = self.http_login(client).await?;
398                    self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
399                        .await
400                }
401            }
402        }
403    }
404    async fn safe_http_call<T>(
405        &self,
406        client: &HttpClient,
407        token: Option<&str>,
408        target: Option<&str>,
409        method: &str,
410        params: Option<Value>,
411    ) -> EResult<T>
412    where
413        T: DeserializeOwned,
414    {
415        tokio::time::timeout(
416            self.config.timeout,
417            self.http_call(client, token, target, method, params),
418        )
419        .await?
420    }
421    async fn http_call<T>(
422        &self,
423        client: &HttpClient,
424        token: Option<&str>,
425        target: Option<&str>,
426        method: &str,
427        params: Option<Value>,
428    ) -> EResult<T>
429    where
430        T: DeserializeOwned,
431    {
432        macro_rules! params_map {
433            ($map: expr, $token: expr) => {{
434                $map.insert(
435                    Value::String("k".to_owned()),
436                    Value::String($token.to_owned()),
437                );
438                Some(Value::Map($map))
439            }};
440        }
441        let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
442        let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
443        let request = JsonRpcRequest::new(
444            Some(Value::U32(id)),
445            if let Some(ref m) = bus_method {
446                m
447            } else {
448                method
449            },
450            if let Some(tk) = token {
451                if let Some(par) = params {
452                    let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
453                    params_map!(p_map, tk)
454                } else {
455                    let mut p_map = BTreeMap::new();
456                    params_map!(p_map, tk)
457                }
458            } else {
459                params
460            },
461            rjrpc::Encoding::MsgPack,
462        );
463        let http_request = Request::builder()
464            .method(Method::POST)
465            .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
466            .uri(&self.path)
467            .body(Body::from(request.pack().map_err(Error::invalid_data)?))
468            .map_err(Error::io)?;
469        let http_res = client.request(http_request).await.map_err(Error::io)?;
470        let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
471        let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
472            .map_err(Error::invalid_data)?;
473        if u32::try_from(res.id)? == id {
474            if let Some(err) = res.error {
475                Err(Error::newc(err.code.into(), err.message))
476            } else if let Some(result) = res.result {
477                Ok(T::deserialize(result).map_err(Error::invalid_data)?)
478            } else {
479                Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
480            }
481        } else {
482            Err(Error::io("invalid JRPC response: id mismatch"))
483        }
484    }
485}
486
487enum ClientKind {
488    Bus(rpc::RpcClient),
489    Http(HttpClient),
490}