eva_client/
lib.rs

1use busrt::{
2    QoS, ipc,
3    rpc::{self, Rpc},
4};
5use eva_common::events::NodeInfo;
6use eva_common::payload::{pack, unpack};
7use eva_common::prelude::*;
8use hyper::{Body, Method, Request, client::HttpConnector};
9use hyper_tls::HttpsConnector;
10use rjrpc::{JsonRpcRequest, JsonRpcResponse};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::atomic;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18pub type NodeMap = BTreeMap<String, String>;
19
20pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
21
22static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
23const CT_HEADER: &str = "application/msgpack";
24
25pub const LOCAL: &str = ".local";
26
27#[derive(Deserialize)]
28pub struct SystemInfo {
29    pub system_name: String,
30    pub active: bool,
31    #[serde(flatten)]
32    pub ver: VersionInfo,
33}
34
35#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
36pub struct VersionInfo {
37    pub build: u64,
38    pub version: String,
39}
40
41impl fmt::Display for VersionInfo {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        write!(f, "{} ({})", self.build, self.version)
44    }
45}
46
47impl From<NodeInfo> for VersionInfo {
48    fn from(ni: NodeInfo) -> Self {
49        Self {
50            build: ni.build,
51            version: ni.version,
52        }
53    }
54}
55
56#[inline]
57fn parse_major(ver: &str) -> EResult<u16> {
58    ver.split('.').next().unwrap().parse().map_err(Into::into)
59}
60
61impl VersionInfo {
62    #[inline]
63    pub fn major_matches(&self, ver: &str) -> EResult<bool> {
64        Ok(parse_major(ver)? == self.major()?)
65    }
66    #[inline]
67    pub fn major(&self) -> EResult<u16> {
68        parse_major(&self.version)
69    }
70}
71
72#[allow(clippy::module_name_repetitions)]
73pub struct EvaCloudClient {
74    system_name: String,
75    client: Arc<EvaClient>,
76    node_map: NodeMap,
77}
78
79impl EvaCloudClient {
80    /// Create cloud client from a local
81    pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
82        Self {
83            system_name: system_name.to_owned(),
84            client: Arc::new(client),
85            node_map,
86        }
87    }
88    /// Create cloud client from scratch
89    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
90        let local_client = EvaClient::connect(path, base_name, config).await?;
91        local_client.transform_into_cloud_client().await
92    }
93    pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
94        let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
95        Ok(info)
96    }
97    pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
98    where
99        T: DeserializeOwned,
100    {
101        self.rpc_call(LOCAL, target, method, None).await
102    }
103    pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
104    where
105        T: DeserializeOwned,
106        V: Serialize,
107    {
108        self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
109            .await
110    }
111    pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
112    where
113        T: DeserializeOwned,
114    {
115        self.rpc_call(node, target, method, None).await
116    }
117    pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
118    where
119        T: DeserializeOwned,
120        V: Serialize,
121    {
122        self.rpc_call(node, target, method, Some(to_value(params)?))
123            .await
124    }
125    pub async fn rpc_call<T>(
126        &self,
127        node: &str,
128        target: &str,
129        method: &str,
130        params: Option<Value>,
131    ) -> EResult<T>
132    where
133        T: DeserializeOwned,
134    {
135        if node == ".local" || node == self.system_name {
136            if let Some(params) = params {
137                self.client.call(target, method, params).await
138            } else {
139                self.client.call0(target, method).await
140            }
141        } else {
142            let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
143                BTreeMap::deserialize(p).map_err(Error::invalid_data)?
144            } else {
145                BTreeMap::new()
146            };
147            repl_params.insert(
148                Value::String("node".to_owned()),
149                Value::String(node.to_owned()),
150            );
151            self.client
152                .call(
153                    self.node_map.get(node).ok_or_else(|| {
154                        Error::failed(format!("no replication service mapped for {}", node))
155                    })?,
156                    &format!("bus::{}::{}", target, method),
157                    Some(to_value(repl_params)?),
158                )
159                .await
160        }
161    }
162    #[inline]
163    pub fn client(&self) -> &EvaClient {
164        &self.client
165    }
166    #[inline]
167    pub fn client_cloned(&self) -> Arc<EvaClient> {
168        self.client.clone()
169    }
170}
171
172#[derive(Debug, Clone)]
173pub struct Config {
174    credentials: Option<(String, String)>,
175    token: Option<String>,
176    timeout: Duration,
177}
178
179impl Config {
180    #[inline]
181    pub fn new() -> Self {
182        Self::default()
183    }
184    /// Set API key/token
185    #[inline]
186    pub fn token(mut self, token: &str) -> Self {
187        self.token = Some(token.to_owned());
188        self
189    }
190    /// perform HTTP login with credentials
191    #[inline]
192    pub fn credentials(mut self, login: &str, password: &str) -> Self {
193        self.credentials = Some((login.to_owned(), password.to_owned()));
194        self
195    }
196    #[inline]
197    pub fn timeout(mut self, timeout: Duration) -> Self {
198        self.timeout = timeout;
199        self
200    }
201}
202
203impl Default for Config {
204    #[inline]
205    fn default() -> Self {
206        Self {
207            credentials: None,
208            token: None,
209            timeout: eva_common::DEFAULT_TIMEOUT,
210        }
211    }
212}
213
214#[allow(clippy::module_name_repetitions)]
215pub struct EvaClient {
216    name: String,
217    client: ClientKind,
218    config: Config,
219    token: Mutex<Option<Arc<String>>>,
220    token_preassigned: bool,
221    path: String,
222    request_id: atomic::AtomicU32,
223}
224
225impl EvaClient {
226    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
227        if path.starts_with("http://") || path.starts_with("https://") {
228            let https = HttpsConnector::new();
229            let http_client: hyper::Client<_> = hyper::Client::builder()
230                .pool_idle_timeout(config.timeout)
231                .build(https);
232            let token = config.token.clone();
233            let has_token = token.is_some();
234            let cl = Self {
235                name: base_name.to_owned(),
236                client: ClientKind::Http(http_client),
237                config,
238                token: Mutex::new(token.map(Arc::new)),
239                token_preassigned: has_token,
240                path: path.to_owned(),
241                request_id: atomic::AtomicU32::new(0),
242            };
243            if !has_token && let ClientKind::Http(ref client) = cl.client {
244                cl.http_login(client).await?;
245            }
246            Ok(cl)
247        } else {
248            let name = format!(
249                "{}.{}.{}",
250                base_name,
251                std::process::id(),
252                CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
253            );
254            let mut ipc_config = ipc::Config::new(path, &name).timeout(config.timeout);
255            if let Some(ref token) = config.token {
256                ipc_config = ipc_config.token(token);
257            }
258            let bus = ipc::Client::connect(&ipc_config).await?;
259            let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
260            Ok(Self {
261                name,
262                client: ClientKind::Bus(rpc),
263                config,
264                token: <_>::default(),
265                token_preassigned: false,
266                path: path.to_owned(),
267                request_id: atomic::AtomicU32::new(0),
268            })
269        }
270    }
271    #[inline]
272    pub fn name(&self) -> &str {
273        &self.name
274    }
275    pub async fn get_system_info(&self) -> EResult<SystemInfo> {
276        let info: SystemInfo = self.call0("eva.core", "test").await?;
277        Ok(info)
278    }
279    async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
280        #[derive(Serialize)]
281        struct LoginParams<'a> {
282            u: &'a str,
283            p: &'a str,
284        }
285        #[derive(Deserialize)]
286        struct LoginPayload {
287            token: String,
288        }
289        if let Some(ref creds) = self.config.credentials {
290            let p: LoginPayload = self
291                .safe_http_call(
292                    client,
293                    None,
294                    None,
295                    "login",
296                    Some(to_value(LoginParams {
297                        u: &creds.0,
298                        p: &creds.1,
299                    })?),
300                )
301                .await?;
302            let token = Arc::new(p.token);
303            self.token.lock().unwrap().replace(token.clone());
304            Ok(token)
305        } else {
306            Err(Error::access("no credentials set"))
307        }
308    }
309    pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
310        #[derive(Deserialize)]
311        struct NodeList {
312            name: String,
313            svc: Option<String>,
314        }
315        let system_name = self.get_system_info().await?.system_name;
316        let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
317        let node_map: NodeMap = node_list
318            .into_iter()
319            .filter_map(|v| v.svc.map(|s| (v.name, s)))
320            .collect();
321        Ok(EvaCloudClient::from_eva_client(
322            &system_name,
323            self,
324            node_map,
325        ))
326    }
327    pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
328    where
329        T: DeserializeOwned,
330    {
331        self.rpc_call(target, method, None::<()>).await
332    }
333    pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
334    where
335        T: DeserializeOwned,
336        V: Serialize,
337    {
338        self.rpc_call(target, method, Some(params)).await
339    }
340    /// # Panics
341    ///
342    /// Will panic if token mutex is poisoned
343    pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
344    where
345        T: DeserializeOwned,
346        V: Serialize,
347    {
348        match self.client {
349            ClientKind::Bus(ref c) => {
350                let payload: busrt::borrow::Cow = if let Some(ref p) = params {
351                    pack(p)?.into()
352                } else {
353                    busrt::empty_payload!()
354                };
355                let res = tokio::time::timeout(
356                    self.config.timeout,
357                    c.call(target, method, payload, QoS::Processed),
358                )
359                .await??;
360                let result = res.payload();
361                if result.is_empty() {
362                    Ok(T::deserialize(Value::Unit)?)
363                } else {
364                    Ok(unpack(result)?)
365                }
366            }
367            ClientKind::Http(ref client) => {
368                let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
369                let params_payload = if let Some(p) = params {
370                    Some(to_value(p)?)
371                } else {
372                    None
373                };
374                if let Some(token) = to {
375                    match self
376                        .safe_http_call(
377                            client,
378                            Some(&token),
379                            Some(target),
380                            method,
381                            params_payload.clone(),
382                        )
383                        .await
384                    {
385                        Err(e)
386                            if !self.token_preassigned
387                                && e.kind() == ErrorKind::AccessDenied
388                                && (e.message() == Some("invalid token")) =>
389                        {
390                            // repeat request with new token
391                            let token = self.http_login(client).await?;
392                            self.safe_http_call(
393                                client,
394                                Some(&token),
395                                Some(target),
396                                method,
397                                params_payload,
398                            )
399                            .await
400                        }
401                        res => res,
402                    }
403                } else {
404                    let token = self.http_login(client).await?;
405                    self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
406                        .await
407                }
408            }
409        }
410    }
411    async fn safe_http_call<T>(
412        &self,
413        client: &HttpClient,
414        token: Option<&str>,
415        target: Option<&str>,
416        method: &str,
417        params: Option<Value>,
418    ) -> EResult<T>
419    where
420        T: DeserializeOwned,
421    {
422        tokio::time::timeout(
423            self.config.timeout,
424            self.http_call(client, token, target, method, params),
425        )
426        .await?
427    }
428    async fn http_call<T>(
429        &self,
430        client: &HttpClient,
431        token: Option<&str>,
432        target: Option<&str>,
433        method: &str,
434        params: Option<Value>,
435    ) -> EResult<T>
436    where
437        T: DeserializeOwned,
438    {
439        macro_rules! params_map {
440            ($map: expr, $token: expr) => {{
441                $map.insert(
442                    Value::String("k".to_owned()),
443                    Value::String($token.to_owned()),
444                );
445                Some(Value::Map($map))
446            }};
447        }
448        let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
449        let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
450        let request = JsonRpcRequest::new(
451            Some(Value::U32(id)),
452            if let Some(ref m) = bus_method {
453                m
454            } else {
455                method
456            },
457            if let Some(tk) = token {
458                if let Some(par) = params {
459                    let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
460                    params_map!(p_map, tk)
461                } else {
462                    let mut p_map = BTreeMap::new();
463                    params_map!(p_map, tk)
464                }
465            } else {
466                params
467            },
468            rjrpc::Encoding::MsgPack,
469        );
470        let http_request = Request::builder()
471            .method(Method::POST)
472            .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
473            .uri(&self.path)
474            .body(Body::from(request.pack().map_err(Error::invalid_data)?))
475            .map_err(Error::io)?;
476        let http_res = client.request(http_request).await.map_err(Error::io)?;
477        let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
478        let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
479            .map_err(Error::invalid_data)?;
480        if u32::try_from(res.id)? == id {
481            if let Some(err) = res.error {
482                Err(Error::newc(err.code.into(), err.message))
483            } else if let Some(result) = res.result {
484                Ok(T::deserialize(result).map_err(Error::invalid_data)?)
485            } else {
486                Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
487            }
488        } else {
489            Err(Error::io("invalid JRPC response: id mismatch"))
490        }
491    }
492}
493
494enum ClientKind {
495    Bus(rpc::RpcClient),
496    Http(HttpClient),
497}