eva_client/
lib.rs

1use busrt::{
2    ipc,
3    rpc::{self, Rpc},
4    QoS,
5};
6use eva_common::events::NodeInfo;
7use eva_common::payload::{pack, unpack};
8use eva_common::prelude::*;
9use hyper::{client::HttpConnector, Body, Method, Request};
10use hyper_tls::HttpsConnector;
11use rjrpc::{JsonRpcRequest, JsonRpcResponse};
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use std::collections::BTreeMap;
14use std::fmt;
15use std::sync::atomic;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19pub type NodeMap = BTreeMap<String, String>;
20
21pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
22
23static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
24const CT_HEADER: &str = "application/msgpack";
25
26pub const LOCAL: &str = ".local";
27
28#[derive(Deserialize)]
29pub struct SystemInfo {
30    pub system_name: String,
31    pub active: bool,
32    #[serde(flatten)]
33    pub ver: VersionInfo,
34}
35
36#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
37pub struct VersionInfo {
38    pub build: u64,
39    pub version: String,
40}
41
42impl fmt::Display for VersionInfo {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "{} ({})", self.build, self.version)
45    }
46}
47
48impl From<NodeInfo> for VersionInfo {
49    fn from(ni: NodeInfo) -> Self {
50        Self {
51            build: ni.build,
52            version: ni.version,
53        }
54    }
55}
56
57#[inline]
58fn parse_major(ver: &str) -> EResult<u16> {
59    ver.split('.').next().unwrap().parse().map_err(Into::into)
60}
61
62impl VersionInfo {
63    #[inline]
64    pub fn major_matches(&self, ver: &str) -> EResult<bool> {
65        Ok(parse_major(ver)? == self.major()?)
66    }
67    #[inline]
68    pub fn major(&self) -> EResult<u16> {
69        parse_major(&self.version)
70    }
71}
72
73#[allow(clippy::module_name_repetitions)]
74pub struct EvaCloudClient {
75    system_name: String,
76    client: Arc<EvaClient>,
77    node_map: NodeMap,
78}
79
80impl EvaCloudClient {
81    /// Create cloud client from a local
82    pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
83        Self {
84            system_name: system_name.to_owned(),
85            client: Arc::new(client),
86            node_map,
87        }
88    }
89    /// Create cloud client from scratch
90    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
91        let local_client = EvaClient::connect(path, base_name, config).await?;
92        local_client.transform_into_cloud_client().await
93    }
94    pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
95        let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
96        Ok(info)
97    }
98    pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
99    where
100        T: DeserializeOwned,
101    {
102        self.rpc_call(LOCAL, target, method, None).await
103    }
104    pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
105    where
106        T: DeserializeOwned,
107        V: Serialize,
108    {
109        self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
110            .await
111    }
112    pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
113    where
114        T: DeserializeOwned,
115    {
116        self.rpc_call(node, target, method, None).await
117    }
118    pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
119    where
120        T: DeserializeOwned,
121        V: Serialize,
122    {
123        self.rpc_call(node, target, method, Some(to_value(params)?))
124            .await
125    }
126    pub async fn rpc_call<T>(
127        &self,
128        node: &str,
129        target: &str,
130        method: &str,
131        params: Option<Value>,
132    ) -> EResult<T>
133    where
134        T: DeserializeOwned,
135    {
136        if node == ".local" || node == self.system_name {
137            self.client.call(target, method, params).await
138        } else {
139            let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
140                BTreeMap::deserialize(p).map_err(Error::invalid_data)?
141            } else {
142                BTreeMap::new()
143            };
144            repl_params.insert(
145                Value::String("node".to_owned()),
146                Value::String(node.to_owned()),
147            );
148            self.client
149                .call(
150                    self.node_map.get(node).ok_or_else(|| {
151                        Error::failed(format!("no replication service mapped for {}", node))
152                    })?,
153                    &format!("bus::{}::{}", target, method),
154                    Some(to_value(repl_params)?),
155                )
156                .await
157        }
158    }
159    #[inline]
160    pub fn client(&self) -> &EvaClient {
161        &self.client
162    }
163    #[inline]
164    pub fn client_cloned(&self) -> Arc<EvaClient> {
165        self.client.clone()
166    }
167}
168
169#[derive(Debug, Clone)]
170pub struct Config {
171    credentials: Option<(String, String)>,
172    token: Option<String>,
173    timeout: Duration,
174}
175
176impl Config {
177    #[inline]
178    pub fn new() -> Self {
179        Self::default()
180    }
181    /// Set API key/token
182    #[inline]
183    pub fn token(mut self, token: &str) -> Self {
184        self.token = Some(token.to_owned());
185        self
186    }
187    /// perform HTTP login with credentials
188    #[inline]
189    pub fn credentials(mut self, login: &str, password: &str) -> Self {
190        self.credentials = Some((login.to_owned(), password.to_owned()));
191        self
192    }
193    #[inline]
194    pub fn timeout(mut self, timeout: Duration) -> Self {
195        self.timeout = timeout;
196        self
197    }
198}
199
200impl Default for Config {
201    #[inline]
202    fn default() -> Self {
203        Self {
204            credentials: None,
205            token: None,
206            timeout: eva_common::DEFAULT_TIMEOUT,
207        }
208    }
209}
210
211#[allow(clippy::module_name_repetitions)]
212pub struct EvaClient {
213    name: String,
214    client: ClientKind,
215    config: Config,
216    token: Mutex<Option<Arc<String>>>,
217    token_preassigned: bool,
218    path: String,
219    request_id: atomic::AtomicU32,
220}
221
222impl EvaClient {
223    pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
224        if path.starts_with("http://") || path.starts_with("https://") {
225            let https = HttpsConnector::new();
226            let http_client: hyper::Client<_> = hyper::Client::builder()
227                .pool_idle_timeout(config.timeout)
228                .build(https);
229            let token = config.token.clone();
230            let has_token = token.is_some();
231            let cl = Self {
232                name: base_name.to_owned(),
233                client: ClientKind::Http(http_client),
234                config,
235                token: Mutex::new(token.map(Arc::new)),
236                token_preassigned: has_token,
237                path: path.to_owned(),
238                request_id: atomic::AtomicU32::new(0),
239            };
240            if !has_token {
241                if let ClientKind::Http(ref client) = cl.client {
242                    cl.http_login(client).await?;
243                }
244            }
245            Ok(cl)
246        } else {
247            let name = format!(
248                "{}.{}.{}",
249                base_name,
250                std::process::id(),
251                CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
252            );
253            let bus = ipc::Client::connect(&ipc::Config::new(path, &name).timeout(config.timeout))
254                .await?;
255            let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
256            Ok(Self {
257                name,
258                client: ClientKind::Bus(rpc),
259                config,
260                token: <_>::default(),
261                token_preassigned: false,
262                path: path.to_owned(),
263                request_id: atomic::AtomicU32::new(0),
264            })
265        }
266    }
267    #[inline]
268    pub fn name(&self) -> &str {
269        &self.name
270    }
271    pub async fn get_system_info(&self) -> EResult<SystemInfo> {
272        let info: SystemInfo = self.call0("eva.core", "test").await?;
273        Ok(info)
274    }
275    async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
276        #[derive(Serialize)]
277        struct LoginParams<'a> {
278            u: &'a str,
279            p: &'a str,
280        }
281        #[derive(Deserialize)]
282        struct LoginPayload {
283            token: String,
284        }
285        if let Some(ref creds) = self.config.credentials {
286            let p: LoginPayload = self
287                .safe_http_call(
288                    client,
289                    None,
290                    None,
291                    "login",
292                    Some(to_value(LoginParams {
293                        u: &creds.0,
294                        p: &creds.1,
295                    })?),
296                )
297                .await?;
298            let token = Arc::new(p.token);
299            self.token.lock().unwrap().replace(token.clone());
300            Ok(token)
301        } else {
302            Err(Error::access("no credentials set"))
303        }
304    }
305    pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
306        #[derive(Deserialize)]
307        struct NodeList {
308            name: String,
309            svc: Option<String>,
310        }
311        let system_name = self.get_system_info().await?.system_name;
312        let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
313        let node_map: NodeMap = node_list
314            .into_iter()
315            .filter_map(|v| v.svc.map(|s| (v.name, s)))
316            .collect();
317        Ok(EvaCloudClient::from_eva_client(
318            &system_name,
319            self,
320            node_map,
321        ))
322    }
323    pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
324    where
325        T: DeserializeOwned,
326    {
327        self.rpc_call(target, method, None::<()>).await
328    }
329    pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
330    where
331        T: DeserializeOwned,
332        V: Serialize,
333    {
334        self.rpc_call(target, method, Some(params)).await
335    }
336    /// # Panics
337    ///
338    /// Will panic if token mutex is poisoned
339    pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
340    where
341        T: DeserializeOwned,
342        V: Serialize,
343    {
344        match self.client {
345            ClientKind::Bus(ref c) => {
346                let payload: busrt::borrow::Cow = if let Some(ref p) = params {
347                    pack(p)?.into()
348                } else {
349                    busrt::empty_payload!()
350                };
351                let res = tokio::time::timeout(
352                    self.config.timeout,
353                    c.call(target, method, payload, QoS::Processed),
354                )
355                .await??;
356                let result = res.payload();
357                if result.is_empty() {
358                    Ok(T::deserialize(Value::Unit)?)
359                } else {
360                    Ok(unpack(result)?)
361                }
362            }
363            ClientKind::Http(ref client) => {
364                let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
365                let params_payload = if let Some(p) = params {
366                    Some(to_value(p)?)
367                } else {
368                    None
369                };
370                if let Some(token) = to {
371                    match self
372                        .safe_http_call(
373                            client,
374                            Some(&token),
375                            Some(target),
376                            method,
377                            params_payload.clone(),
378                        )
379                        .await
380                    {
381                        Err(e)
382                            if !self.token_preassigned
383                                && e.kind() == ErrorKind::AccessDenied
384                                && (e.message() == Some("invalid token")) =>
385                        {
386                            // repeat request with new token
387                            let token = self.http_login(client).await?;
388                            self.safe_http_call(
389                                client,
390                                Some(&token),
391                                Some(target),
392                                method,
393                                params_payload,
394                            )
395                            .await
396                        }
397                        res => res,
398                    }
399                } else {
400                    let token = self.http_login(client).await?;
401                    self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
402                        .await
403                }
404            }
405        }
406    }
407    async fn safe_http_call<T>(
408        &self,
409        client: &HttpClient,
410        token: Option<&str>,
411        target: Option<&str>,
412        method: &str,
413        params: Option<Value>,
414    ) -> EResult<T>
415    where
416        T: DeserializeOwned,
417    {
418        tokio::time::timeout(
419            self.config.timeout,
420            self.http_call(client, token, target, method, params),
421        )
422        .await?
423    }
424    async fn http_call<T>(
425        &self,
426        client: &HttpClient,
427        token: Option<&str>,
428        target: Option<&str>,
429        method: &str,
430        params: Option<Value>,
431    ) -> EResult<T>
432    where
433        T: DeserializeOwned,
434    {
435        macro_rules! params_map {
436            ($map: expr, $token: expr) => {{
437                $map.insert(
438                    Value::String("k".to_owned()),
439                    Value::String($token.to_owned()),
440                );
441                Some(Value::Map($map))
442            }};
443        }
444        let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
445        let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
446        let request = JsonRpcRequest::new(
447            Some(Value::U32(id)),
448            if let Some(ref m) = bus_method {
449                m
450            } else {
451                method
452            },
453            if let Some(tk) = token {
454                if let Some(par) = params {
455                    let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
456                    params_map!(p_map, tk)
457                } else {
458                    let mut p_map = BTreeMap::new();
459                    params_map!(p_map, tk)
460                }
461            } else {
462                params
463            },
464            rjrpc::Encoding::MsgPack,
465        );
466        let http_request = Request::builder()
467            .method(Method::POST)
468            .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
469            .uri(&self.path)
470            .body(Body::from(request.pack().map_err(Error::invalid_data)?))
471            .map_err(Error::io)?;
472        let http_res = client.request(http_request).await.map_err(Error::io)?;
473        let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
474        let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
475            .map_err(Error::invalid_data)?;
476        if u32::try_from(res.id)? == id {
477            if let Some(err) = res.error {
478                Err(Error::newc(err.code.into(), err.message))
479            } else if let Some(result) = res.result {
480                Ok(T::deserialize(result).map_err(Error::invalid_data)?)
481            } else {
482                Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
483            }
484        } else {
485            Err(Error::io("invalid JRPC response: id mismatch"))
486        }
487    }
488}
489
490enum ClientKind {
491    Bus(rpc::RpcClient),
492    Http(HttpClient),
493}