eva_client/
lib.rs

1use busrt::{
2    ipc,
3    rpc::{self, Rpc},
4    QoS,
5};
6use eva_common::events::NodeInfo;
7use eva_common::payload::{pack, unpack};
8use eva_common::prelude::*;
9use hyper::{client::HttpConnector, Body, Method, Request};
10use hyper_tls::HttpsConnector;
11use rjrpc::{JsonRpcRequest, JsonRpcResponse};
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use std::collections::BTreeMap;
14use std::fmt;
15use std::sync::atomic;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19pub type NodeMap = BTreeMap<String, String>;
20
21pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
22
23static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
24const CT_HEADER: &str = "application/msgpack";
25
26pub const LOCAL: &str = ".local";
27
28#[derive(Deserialize)]
29pub struct SystemInfo {
30    pub system_name: String,
31    pub active: bool,
32    #[serde(flatten)]
33    pub ver: VersionInfo,
34}
35
36#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
37pub struct VersionInfo {
38    pub build: u64,
39    pub version: String,
40}
41
42impl fmt::Display for VersionInfo {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "{} ({})", self.build, self.version)
45    }
46}
47
48impl From<NodeInfo> for VersionInfo {
49    fn from(ni: NodeInfo) -> Self {
50        Self {
51            build: ni.build,
52            version: ni.version,
53        }
54    }
55}
56
57#[inline]
58fn parse_major(ver: &str) -> EResult<u16> {
59    ver.split('.').next().unwrap().parse().map_err(Into::into)
60}
61
62impl VersionInfo {
63    #[inline]
64    pub fn major_matches(&self, ver: &str) -> EResult<bool> {
65        Ok(parse_major(ver)? == self.major()?)
66    }
67    #[inline]
68    pub fn major(&self) -> EResult<u16> {
69        parse_major(&self.version)
70    }
71}
72
73#[allow(clippy::module_name_repetitions)]
74pub struct EvaCloudClient {
75    system_name: String,
76    client: Arc<EvaClient>,
77    node_map: NodeMap,
78}
79
80impl EvaCloudClient {
81    /// Create cloud client from a local
82    pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
83        Self {
84            system_name: system_name.to_owned(),
85            client: Arc::new(client),
86            node_map,
87        }
88    }
89    /// Create cloud client from scratch
90    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
91        let local_client = EvaClient::connect(path, base_name, config).await?;
92        local_client.transform_into_cloud_client().await
93    }
94    pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
95        let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
96        Ok(info)
97    }
98    pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
99    where
100        T: DeserializeOwned,
101    {
102        self.rpc_call(LOCAL, target, method, None).await
103    }
104    pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
105    where
106        T: DeserializeOwned,
107        V: Serialize,
108    {
109        self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
110            .await
111    }
112    pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
113    where
114        T: DeserializeOwned,
115    {
116        self.rpc_call(node, target, method, None).await
117    }
118    pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
119    where
120        T: DeserializeOwned,
121        V: Serialize,
122    {
123        self.rpc_call(node, target, method, Some(to_value(params)?))
124            .await
125    }
126    pub async fn rpc_call<T>(
127        &self,
128        node: &str,
129        target: &str,
130        method: &str,
131        params: Option<Value>,
132    ) -> EResult<T>
133    where
134        T: DeserializeOwned,
135    {
136        if node == ".local" || node == self.system_name {
137            self.client.call(target, method, params).await
138        } else {
139            let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
140                BTreeMap::deserialize(p).map_err(Error::invalid_data)?
141            } else {
142                BTreeMap::new()
143            };
144            repl_params.insert(
145                Value::String("node".to_owned()),
146                Value::String(node.to_owned()),
147            );
148            self.client
149                .call(
150                    self.node_map.get(node).ok_or_else(|| {
151                        Error::failed(format!("no replication service mapped for {}", node))
152                    })?,
153                    &format!("bus::{}::{}", target, method),
154                    Some(to_value(repl_params)?),
155                )
156                .await
157        }
158    }
159    #[inline]
160    pub fn client(&self) -> &EvaClient {
161        &self.client
162    }
163    #[inline]
164    pub fn client_cloned(&self) -> Arc<EvaClient> {
165        self.client.clone()
166    }
167}
168
169#[derive(Debug, Clone)]
170pub struct Config {
171    credentials: Option<(String, String)>,
172    timeout: Duration,
173}
174
175impl Config {
176    #[inline]
177    pub fn new() -> Self {
178        Self::default()
179    }
180    #[inline]
181    pub fn credentials(mut self, login: &str, password: &str) -> Self {
182        self.credentials = Some((login.to_owned(), password.to_owned()));
183        self
184    }
185    #[inline]
186    pub fn timeout(mut self, timeout: Duration) -> Self {
187        self.timeout = timeout;
188        self
189    }
190}
191
192impl Default for Config {
193    #[inline]
194    fn default() -> Self {
195        Self {
196            credentials: None,
197            timeout: eva_common::DEFAULT_TIMEOUT,
198        }
199    }
200}
201
202#[allow(clippy::module_name_repetitions)]
203pub struct EvaClient {
204    name: String,
205    client: ClientKind,
206    config: Config,
207    token: Mutex<Option<Arc<String>>>,
208    path: String,
209    request_id: atomic::AtomicU32,
210}
211
212impl EvaClient {
213    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
214        if path.starts_with("http://") || path.starts_with("https://") {
215            let https = HttpsConnector::new();
216            let http_client: hyper::Client<_> = hyper::Client::builder()
217                .pool_idle_timeout(config.timeout)
218                .build(https);
219            let cl = Self {
220                name: base_name.to_owned(),
221                client: ClientKind::Http(http_client),
222                config,
223                token: <_>::default(),
224                path: path.to_owned(),
225                request_id: atomic::AtomicU32::new(0),
226            };
227            if let ClientKind::Http(ref client) = cl.client {
228                cl.http_login(client).await?;
229            }
230            Ok(cl)
231        } else {
232            let name = format!(
233                "{}.{}.{}",
234                base_name,
235                std::process::id(),
236                CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
237            );
238            let bus = tokio::time::timeout(
239                config.timeout,
240                ipc::Client::connect(&ipc::Config::new(path, &name)),
241            )
242            .await??;
243            let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
244            Ok(Self {
245                name,
246                client: ClientKind::Bus(rpc),
247                config,
248                token: <_>::default(),
249                path: path.to_owned(),
250                request_id: atomic::AtomicU32::new(0),
251            })
252        }
253    }
254    #[inline]
255    pub fn name(&self) -> &str {
256        &self.name
257    }
258    pub async fn get_system_info(&self) -> EResult<SystemInfo> {
259        let info: SystemInfo = self.call0("eva.core", "test").await?;
260        Ok(info)
261    }
262    async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
263        #[derive(Serialize)]
264        struct LoginParams<'a> {
265            u: &'a str,
266            p: &'a str,
267        }
268        #[derive(Deserialize)]
269        struct LoginPayload {
270            token: String,
271        }
272        if let Some(ref creds) = self.config.credentials {
273            let p: LoginPayload = self
274                .safe_http_call(
275                    client,
276                    None,
277                    None,
278                    "login",
279                    Some(to_value(LoginParams {
280                        u: &creds.0,
281                        p: &creds.1,
282                    })?),
283                )
284                .await?;
285            let token = Arc::new(p.token);
286            self.token.lock().unwrap().replace(token.clone());
287            Ok(token)
288        } else {
289            Err(Error::access("no credentials set"))
290        }
291    }
292    pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
293        #[derive(Deserialize)]
294        struct NodeList {
295            name: String,
296            svc: Option<String>,
297        }
298        let system_name = self.get_system_info().await?.system_name;
299        let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
300        let node_map: NodeMap = node_list
301            .into_iter()
302            .filter_map(|v| v.svc.map(|s| (v.name, s)))
303            .collect();
304        Ok(EvaCloudClient::from_eva_client(
305            &system_name,
306            self,
307            node_map,
308        ))
309    }
310    pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
311    where
312        T: DeserializeOwned,
313    {
314        self.rpc_call(target, method, None::<()>).await
315    }
316    pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
317    where
318        T: DeserializeOwned,
319        V: Serialize,
320    {
321        self.rpc_call(target, method, Some(params)).await
322    }
323    /// # Panics
324    ///
325    /// Will panic if token mutex is poisoned
326    pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
327    where
328        T: DeserializeOwned,
329        V: Serialize,
330    {
331        match self.client {
332            ClientKind::Bus(ref c) => {
333                let payload: busrt::borrow::Cow = if let Some(ref p) = params {
334                    pack(p)?.into()
335                } else {
336                    busrt::empty_payload!()
337                };
338                let res = tokio::time::timeout(
339                    self.config.timeout,
340                    c.call(target, method, payload, QoS::Processed),
341                )
342                .await??;
343                let result = res.payload();
344                if result.is_empty() {
345                    Ok(T::deserialize(Value::Unit)?)
346                } else {
347                    Ok(unpack(result)?)
348                }
349            }
350            ClientKind::Http(ref client) => {
351                let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
352                let params_payload = to_value(params)?;
353                if let Some(token) = to {
354                    match self
355                        .safe_http_call(
356                            client,
357                            Some(&token),
358                            Some(target),
359                            method,
360                            Some(params_payload.clone()),
361                        )
362                        .await
363                    {
364                        Err(e)
365                            if e.kind() == ErrorKind::AccessDenied
366                                && e.message().map_or(false, |m| m == "invalid token") =>
367                        {
368                            // repeat request with new token
369                            let token = self.http_login(client).await?;
370                            self.safe_http_call(
371                                client,
372                                Some(&token),
373                                Some(target),
374                                method,
375                                Some(params_payload),
376                            )
377                            .await
378                        }
379                        res => res,
380                    }
381                } else {
382                    let token = self.http_login(client).await?;
383                    self.safe_http_call(
384                        client,
385                        Some(&token),
386                        Some(target),
387                        method,
388                        Some(params_payload),
389                    )
390                    .await
391                }
392            }
393        }
394    }
395    async fn safe_http_call<T>(
396        &self,
397        client: &HttpClient,
398        token: Option<&str>,
399        target: Option<&str>,
400        method: &str,
401        params: Option<Value>,
402    ) -> EResult<T>
403    where
404        T: DeserializeOwned,
405    {
406        tokio::time::timeout(
407            self.config.timeout,
408            self.http_call(client, token, target, method, params),
409        )
410        .await?
411    }
412    async fn http_call<T>(
413        &self,
414        client: &HttpClient,
415        token: Option<&str>,
416        target: Option<&str>,
417        method: &str,
418        params: Option<Value>,
419    ) -> EResult<T>
420    where
421        T: DeserializeOwned,
422    {
423        macro_rules! params_map {
424            ($map: expr, $token: expr) => {{
425                $map.insert(
426                    Value::String("k".to_owned()),
427                    Value::String($token.to_owned()),
428                );
429                Some(Value::Map($map))
430            }};
431        }
432        let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
433        let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
434        let request = JsonRpcRequest::new(
435            Some(Value::U32(id)),
436            if let Some(ref m) = bus_method {
437                m
438            } else {
439                method
440            },
441            if let Some(tk) = token {
442                if let Some(par) = params {
443                    let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
444                    params_map!(p_map, tk)
445                } else {
446                    let mut p_map = BTreeMap::new();
447                    params_map!(p_map, tk)
448                }
449            } else {
450                params
451            },
452            rjrpc::Encoding::MsgPack,
453        );
454        let http_request = Request::builder()
455            .method(Method::POST)
456            .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
457            .uri(&self.path)
458            .body(Body::from(request.pack().map_err(Error::invalid_data)?))
459            .map_err(Error::io)?;
460        let http_res = client.request(http_request).await.map_err(Error::io)?;
461        let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
462        let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
463            .map_err(Error::invalid_data)?;
464        if u32::try_from(res.id)? == id {
465            if let Some(err) = res.error {
466                Err(Error::newc(err.code.into(), err.message))
467            } else if let Some(result) = res.result {
468                Ok(T::deserialize(result).map_err(Error::invalid_data)?)
469            } else {
470                Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
471            }
472        } else {
473            Err(Error::io("invalid JRPC response: id mismatch"))
474        }
475    }
476}
477
478enum ClientKind {
479    Bus(rpc::RpcClient),
480    Http(HttpClient),
481}