1use busrt::{
2 QoS, ipc,
3 rpc::{self, Rpc},
4};
5use eva_common::events::NodeInfo;
6use eva_common::payload::{pack, unpack};
7use eva_common::prelude::*;
8use hyper::{Body, Method, Request, client::HttpConnector};
9use hyper_tls::HttpsConnector;
10use rjrpc::{JsonRpcRequest, JsonRpcResponse};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::atomic;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18pub type NodeMap = BTreeMap<String, String>;
19
20pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
21
22static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
23const CT_HEADER: &str = "application/msgpack";
24
25pub const LOCAL: &str = ".local";
26
27#[derive(Deserialize)]
28pub struct SystemInfo {
29 pub system_name: String,
30 pub active: bool,
31 #[serde(flatten)]
32 pub ver: VersionInfo,
33}
34
35#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
36pub struct VersionInfo {
37 pub build: u64,
38 pub version: String,
39}
40
41impl fmt::Display for VersionInfo {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 write!(f, "{} ({})", self.build, self.version)
44 }
45}
46
47impl From<NodeInfo> for VersionInfo {
48 fn from(ni: NodeInfo) -> Self {
49 Self {
50 build: ni.build,
51 version: ni.version,
52 }
53 }
54}
55
56#[inline]
57fn parse_major(ver: &str) -> EResult<u16> {
58 ver.split('.').next().unwrap().parse().map_err(Into::into)
59}
60
61impl VersionInfo {
62 #[inline]
63 pub fn major_matches(&self, ver: &str) -> EResult<bool> {
64 Ok(parse_major(ver)? == self.major()?)
65 }
66 #[inline]
67 pub fn major(&self) -> EResult<u16> {
68 parse_major(&self.version)
69 }
70}
71
72#[allow(clippy::module_name_repetitions)]
73pub struct EvaCloudClient {
74 system_name: String,
75 client: Arc<EvaClient>,
76 node_map: NodeMap,
77}
78
79impl EvaCloudClient {
80 pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
82 Self {
83 system_name: system_name.to_owned(),
84 client: Arc::new(client),
85 node_map,
86 }
87 }
88 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
90 let local_client = EvaClient::connect(path, base_name, config).await?;
91 local_client.transform_into_cloud_client().await
92 }
93 pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
94 let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
95 Ok(info)
96 }
97 pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
98 where
99 T: DeserializeOwned,
100 {
101 self.rpc_call(LOCAL, target, method, None).await
102 }
103 pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
104 where
105 T: DeserializeOwned,
106 V: Serialize,
107 {
108 self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
109 .await
110 }
111 pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
112 where
113 T: DeserializeOwned,
114 {
115 self.rpc_call(node, target, method, None).await
116 }
117 pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
118 where
119 T: DeserializeOwned,
120 V: Serialize,
121 {
122 self.rpc_call(node, target, method, Some(to_value(params)?))
123 .await
124 }
125 pub async fn rpc_call<T>(
126 &self,
127 node: &str,
128 target: &str,
129 method: &str,
130 params: Option<Value>,
131 ) -> EResult<T>
132 where
133 T: DeserializeOwned,
134 {
135 if node == ".local" || node == self.system_name {
136 self.client.call(target, method, params).await
137 } else {
138 let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
139 BTreeMap::deserialize(p).map_err(Error::invalid_data)?
140 } else {
141 BTreeMap::new()
142 };
143 repl_params.insert(
144 Value::String("node".to_owned()),
145 Value::String(node.to_owned()),
146 );
147 self.client
148 .call(
149 self.node_map.get(node).ok_or_else(|| {
150 Error::failed(format!("no replication service mapped for {}", node))
151 })?,
152 &format!("bus::{}::{}", target, method),
153 Some(to_value(repl_params)?),
154 )
155 .await
156 }
157 }
158 #[inline]
159 pub fn client(&self) -> &EvaClient {
160 &self.client
161 }
162 #[inline]
163 pub fn client_cloned(&self) -> Arc<EvaClient> {
164 self.client.clone()
165 }
166}
167
168#[derive(Debug, Clone)]
169pub struct Config {
170 credentials: Option<(String, String)>,
171 token: Option<String>,
172 timeout: Duration,
173}
174
175impl Config {
176 #[inline]
177 pub fn new() -> Self {
178 Self::default()
179 }
180 #[inline]
182 pub fn token(mut self, token: &str) -> Self {
183 self.token = Some(token.to_owned());
184 self
185 }
186 #[inline]
188 pub fn credentials(mut self, login: &str, password: &str) -> Self {
189 self.credentials = Some((login.to_owned(), password.to_owned()));
190 self
191 }
192 #[inline]
193 pub fn timeout(mut self, timeout: Duration) -> Self {
194 self.timeout = timeout;
195 self
196 }
197}
198
199impl Default for Config {
200 #[inline]
201 fn default() -> Self {
202 Self {
203 credentials: None,
204 token: None,
205 timeout: eva_common::DEFAULT_TIMEOUT,
206 }
207 }
208}
209
210#[allow(clippy::module_name_repetitions)]
211pub struct EvaClient {
212 name: String,
213 client: ClientKind,
214 config: Config,
215 token: Mutex<Option<Arc<String>>>,
216 token_preassigned: bool,
217 path: String,
218 request_id: atomic::AtomicU32,
219}
220
221impl EvaClient {
222 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
223 if path.starts_with("http://") || path.starts_with("https://") {
224 let https = HttpsConnector::new();
225 let http_client: hyper::Client<_> = hyper::Client::builder()
226 .pool_idle_timeout(config.timeout)
227 .build(https);
228 let token = config.token.clone();
229 let has_token = token.is_some();
230 let cl = Self {
231 name: base_name.to_owned(),
232 client: ClientKind::Http(http_client),
233 config,
234 token: Mutex::new(token.map(Arc::new)),
235 token_preassigned: has_token,
236 path: path.to_owned(),
237 request_id: atomic::AtomicU32::new(0),
238 };
239 if !has_token && let ClientKind::Http(ref client) = cl.client {
240 cl.http_login(client).await?;
241 }
242 Ok(cl)
243 } else {
244 let name = format!(
245 "{}.{}.{}",
246 base_name,
247 std::process::id(),
248 CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
249 );
250 let mut ipc_config = ipc::Config::new(path, &name).timeout(config.timeout);
251 if let Some(ref token) = config.token {
252 ipc_config = ipc_config.token(token);
253 }
254 let bus = ipc::Client::connect(&ipc_config).await?;
255 let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
256 Ok(Self {
257 name,
258 client: ClientKind::Bus(rpc),
259 config,
260 token: <_>::default(),
261 token_preassigned: false,
262 path: path.to_owned(),
263 request_id: atomic::AtomicU32::new(0),
264 })
265 }
266 }
267 #[inline]
268 pub fn name(&self) -> &str {
269 &self.name
270 }
271 pub async fn get_system_info(&self) -> EResult<SystemInfo> {
272 let info: SystemInfo = self.call0("eva.core", "test").await?;
273 Ok(info)
274 }
275 async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
276 #[derive(Serialize)]
277 struct LoginParams<'a> {
278 u: &'a str,
279 p: &'a str,
280 }
281 #[derive(Deserialize)]
282 struct LoginPayload {
283 token: String,
284 }
285 if let Some(ref creds) = self.config.credentials {
286 let p: LoginPayload = self
287 .safe_http_call(
288 client,
289 None,
290 None,
291 "login",
292 Some(to_value(LoginParams {
293 u: &creds.0,
294 p: &creds.1,
295 })?),
296 )
297 .await?;
298 let token = Arc::new(p.token);
299 self.token.lock().unwrap().replace(token.clone());
300 Ok(token)
301 } else {
302 Err(Error::access("no credentials set"))
303 }
304 }
305 pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
306 #[derive(Deserialize)]
307 struct NodeList {
308 name: String,
309 svc: Option<String>,
310 }
311 let system_name = self.get_system_info().await?.system_name;
312 let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
313 let node_map: NodeMap = node_list
314 .into_iter()
315 .filter_map(|v| v.svc.map(|s| (v.name, s)))
316 .collect();
317 Ok(EvaCloudClient::from_eva_client(
318 &system_name,
319 self,
320 node_map,
321 ))
322 }
323 pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
324 where
325 T: DeserializeOwned,
326 {
327 self.rpc_call(target, method, None::<()>).await
328 }
329 pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
330 where
331 T: DeserializeOwned,
332 V: Serialize,
333 {
334 self.rpc_call(target, method, Some(params)).await
335 }
336 pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
340 where
341 T: DeserializeOwned,
342 V: Serialize,
343 {
344 match self.client {
345 ClientKind::Bus(ref c) => {
346 let payload: busrt::borrow::Cow = if let Some(ref p) = params {
347 pack(p)?.into()
348 } else {
349 busrt::empty_payload!()
350 };
351 let res = tokio::time::timeout(
352 self.config.timeout,
353 c.call(target, method, payload, QoS::Processed),
354 )
355 .await??;
356 let result = res.payload();
357 if result.is_empty() {
358 Ok(T::deserialize(Value::Unit)?)
359 } else {
360 Ok(unpack(result)?)
361 }
362 }
363 ClientKind::Http(ref client) => {
364 let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
365 let params_payload = if let Some(p) = params {
366 Some(to_value(p)?)
367 } else {
368 None
369 };
370 if let Some(token) = to {
371 match self
372 .safe_http_call(
373 client,
374 Some(&token),
375 Some(target),
376 method,
377 params_payload.clone(),
378 )
379 .await
380 {
381 Err(e)
382 if !self.token_preassigned
383 && e.kind() == ErrorKind::AccessDenied
384 && (e.message() == Some("invalid token")) =>
385 {
386 let token = self.http_login(client).await?;
388 self.safe_http_call(
389 client,
390 Some(&token),
391 Some(target),
392 method,
393 params_payload,
394 )
395 .await
396 }
397 res => res,
398 }
399 } else {
400 let token = self.http_login(client).await?;
401 self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
402 .await
403 }
404 }
405 }
406 }
407 async fn safe_http_call<T>(
408 &self,
409 client: &HttpClient,
410 token: Option<&str>,
411 target: Option<&str>,
412 method: &str,
413 params: Option<Value>,
414 ) -> EResult<T>
415 where
416 T: DeserializeOwned,
417 {
418 tokio::time::timeout(
419 self.config.timeout,
420 self.http_call(client, token, target, method, params),
421 )
422 .await?
423 }
424 async fn http_call<T>(
425 &self,
426 client: &HttpClient,
427 token: Option<&str>,
428 target: Option<&str>,
429 method: &str,
430 params: Option<Value>,
431 ) -> EResult<T>
432 where
433 T: DeserializeOwned,
434 {
435 macro_rules! params_map {
436 ($map: expr, $token: expr) => {{
437 $map.insert(
438 Value::String("k".to_owned()),
439 Value::String($token.to_owned()),
440 );
441 Some(Value::Map($map))
442 }};
443 }
444 let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
445 let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
446 let request = JsonRpcRequest::new(
447 Some(Value::U32(id)),
448 if let Some(ref m) = bus_method {
449 m
450 } else {
451 method
452 },
453 if let Some(tk) = token {
454 if let Some(par) = params {
455 let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
456 params_map!(p_map, tk)
457 } else {
458 let mut p_map = BTreeMap::new();
459 params_map!(p_map, tk)
460 }
461 } else {
462 params
463 },
464 rjrpc::Encoding::MsgPack,
465 );
466 let http_request = Request::builder()
467 .method(Method::POST)
468 .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
469 .uri(&self.path)
470 .body(Body::from(request.pack().map_err(Error::invalid_data)?))
471 .map_err(Error::io)?;
472 let http_res = client.request(http_request).await.map_err(Error::io)?;
473 let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
474 let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
475 .map_err(Error::invalid_data)?;
476 if u32::try_from(res.id)? == id {
477 if let Some(err) = res.error {
478 Err(Error::newc(err.code.into(), err.message))
479 } else if let Some(result) = res.result {
480 Ok(T::deserialize(result).map_err(Error::invalid_data)?)
481 } else {
482 Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
483 }
484 } else {
485 Err(Error::io("invalid JRPC response: id mismatch"))
486 }
487 }
488}
489
490enum ClientKind {
491 Bus(rpc::RpcClient),
492 Http(HttpClient),
493}