1use busrt::{
2 QoS, ipc,
3 rpc::{self, Rpc},
4};
5use eva_common::events::NodeInfo;
6use eva_common::payload::{pack, unpack};
7use eva_common::prelude::*;
8use hyper::{Body, Method, Request, client::HttpConnector};
9use hyper_tls::HttpsConnector;
10use rjrpc::{JsonRpcRequest, JsonRpcResponse};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::atomic;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18pub type NodeMap = BTreeMap<String, String>;
19
20pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
21
22static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
23const CT_HEADER: &str = "application/msgpack";
24
25pub const LOCAL: &str = ".local";
26
27#[derive(Deserialize)]
28pub struct SystemInfo {
29 pub system_name: String,
30 pub active: bool,
31 #[serde(flatten)]
32 pub ver: VersionInfo,
33}
34
35#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
36pub struct VersionInfo {
37 pub build: u64,
38 pub version: String,
39}
40
41impl fmt::Display for VersionInfo {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 write!(f, "{} ({})", self.build, self.version)
44 }
45}
46
47impl From<NodeInfo> for VersionInfo {
48 fn from(ni: NodeInfo) -> Self {
49 Self {
50 build: ni.build,
51 version: ni.version,
52 }
53 }
54}
55
56#[inline]
57fn parse_major(ver: &str) -> EResult<u16> {
58 ver.split('.').next().unwrap().parse().map_err(Into::into)
59}
60
61impl VersionInfo {
62 #[inline]
63 pub fn major_matches(&self, ver: &str) -> EResult<bool> {
64 Ok(parse_major(ver)? == self.major()?)
65 }
66 #[inline]
67 pub fn major(&self) -> EResult<u16> {
68 parse_major(&self.version)
69 }
70}
71
72#[allow(clippy::module_name_repetitions)]
73pub struct EvaCloudClient {
74 system_name: String,
75 client: Arc<EvaClient>,
76 node_map: NodeMap,
77}
78
79impl EvaCloudClient {
80 pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
82 Self {
83 system_name: system_name.to_owned(),
84 client: Arc::new(client),
85 node_map,
86 }
87 }
88 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
90 let local_client = EvaClient::connect(path, base_name, config).await?;
91 local_client.transform_into_cloud_client().await
92 }
93 pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
94 let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
95 Ok(info)
96 }
97 pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
98 where
99 T: DeserializeOwned,
100 {
101 self.rpc_call(LOCAL, target, method, None).await
102 }
103 pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
104 where
105 T: DeserializeOwned,
106 V: Serialize,
107 {
108 self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
109 .await
110 }
111 pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
112 where
113 T: DeserializeOwned,
114 {
115 self.rpc_call(node, target, method, None).await
116 }
117 pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
118 where
119 T: DeserializeOwned,
120 V: Serialize,
121 {
122 self.rpc_call(node, target, method, Some(to_value(params)?))
123 .await
124 }
125 pub async fn rpc_call<T>(
126 &self,
127 node: &str,
128 target: &str,
129 method: &str,
130 params: Option<Value>,
131 ) -> EResult<T>
132 where
133 T: DeserializeOwned,
134 {
135 if node == ".local" || node == self.system_name {
136 if let Some(params) = params {
137 self.client.call(target, method, params).await
138 } else {
139 self.client.call0(target, method).await
140 }
141 } else {
142 let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
143 BTreeMap::deserialize(p).map_err(Error::invalid_data)?
144 } else {
145 BTreeMap::new()
146 };
147 repl_params.insert(
148 Value::String("node".to_owned()),
149 Value::String(node.to_owned()),
150 );
151 self.client
152 .call(
153 self.node_map.get(node).ok_or_else(|| {
154 Error::failed(format!("no replication service mapped for {}", node))
155 })?,
156 &format!("bus::{}::{}", target, method),
157 Some(to_value(repl_params)?),
158 )
159 .await
160 }
161 }
162 #[inline]
163 pub fn client(&self) -> &EvaClient {
164 &self.client
165 }
166 #[inline]
167 pub fn client_cloned(&self) -> Arc<EvaClient> {
168 self.client.clone()
169 }
170}
171
172#[derive(Debug, Clone)]
173pub struct Config {
174 credentials: Option<(String, String)>,
175 token: Option<String>,
176 timeout: Duration,
177}
178
179impl Config {
180 #[inline]
181 pub fn new() -> Self {
182 Self::default()
183 }
184 #[inline]
186 pub fn token(mut self, token: &str) -> Self {
187 self.token = Some(token.to_owned());
188 self
189 }
190 #[inline]
192 pub fn credentials(mut self, login: &str, password: &str) -> Self {
193 self.credentials = Some((login.to_owned(), password.to_owned()));
194 self
195 }
196 #[inline]
197 pub fn timeout(mut self, timeout: Duration) -> Self {
198 self.timeout = timeout;
199 self
200 }
201}
202
203impl Default for Config {
204 #[inline]
205 fn default() -> Self {
206 Self {
207 credentials: None,
208 token: None,
209 timeout: eva_common::DEFAULT_TIMEOUT,
210 }
211 }
212}
213
214#[allow(clippy::module_name_repetitions)]
215pub struct EvaClient {
216 name: String,
217 client: ClientKind,
218 config: Config,
219 token: Mutex<Option<Arc<String>>>,
220 token_preassigned: bool,
221 path: String,
222 request_id: atomic::AtomicU32,
223}
224
225impl EvaClient {
226 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
227 if path.starts_with("http://") || path.starts_with("https://") {
228 let https = HttpsConnector::new();
229 let http_client: hyper::Client<_> = hyper::Client::builder()
230 .pool_idle_timeout(config.timeout)
231 .build(https);
232 let token = config.token.clone();
233 let has_token = token.is_some();
234 let cl = Self {
235 name: base_name.to_owned(),
236 client: ClientKind::Http(http_client),
237 config,
238 token: Mutex::new(token.map(Arc::new)),
239 token_preassigned: has_token,
240 path: path.to_owned(),
241 request_id: atomic::AtomicU32::new(0),
242 };
243 if !has_token && let ClientKind::Http(ref client) = cl.client {
244 cl.http_login(client).await?;
245 }
246 Ok(cl)
247 } else {
248 let name = format!(
249 "{}.{}.{}",
250 base_name,
251 std::process::id(),
252 CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
253 );
254 let mut ipc_config = ipc::Config::new(path, &name).timeout(config.timeout);
255 if let Some(ref token) = config.token {
256 ipc_config = ipc_config.token(token);
257 }
258 let bus = ipc::Client::connect(&ipc_config).await?;
259 let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
260 Ok(Self {
261 name,
262 client: ClientKind::Bus(rpc),
263 config,
264 token: <_>::default(),
265 token_preassigned: false,
266 path: path.to_owned(),
267 request_id: atomic::AtomicU32::new(0),
268 })
269 }
270 }
271 #[inline]
272 pub fn name(&self) -> &str {
273 &self.name
274 }
275 pub async fn get_system_info(&self) -> EResult<SystemInfo> {
276 let info: SystemInfo = self.call0("eva.core", "test").await?;
277 Ok(info)
278 }
279 async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
280 #[derive(Serialize)]
281 struct LoginParams<'a> {
282 u: &'a str,
283 p: &'a str,
284 }
285 #[derive(Deserialize)]
286 struct LoginPayload {
287 token: String,
288 }
289 if let Some(ref creds) = self.config.credentials {
290 let p: LoginPayload = self
291 .safe_http_call(
292 client,
293 None,
294 None,
295 "login",
296 Some(to_value(LoginParams {
297 u: &creds.0,
298 p: &creds.1,
299 })?),
300 )
301 .await?;
302 let token = Arc::new(p.token);
303 self.token.lock().unwrap().replace(token.clone());
304 Ok(token)
305 } else {
306 Err(Error::access("no credentials set"))
307 }
308 }
309 pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
310 #[derive(Deserialize)]
311 struct NodeList {
312 name: String,
313 svc: Option<String>,
314 }
315 let system_name = self.get_system_info().await?.system_name;
316 let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
317 let node_map: NodeMap = node_list
318 .into_iter()
319 .filter_map(|v| v.svc.map(|s| (v.name, s)))
320 .collect();
321 Ok(EvaCloudClient::from_eva_client(
322 &system_name,
323 self,
324 node_map,
325 ))
326 }
327 pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
328 where
329 T: DeserializeOwned,
330 {
331 self.rpc_call(target, method, None::<()>).await
332 }
333 pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
334 where
335 T: DeserializeOwned,
336 V: Serialize,
337 {
338 self.rpc_call(target, method, Some(params)).await
339 }
340 pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
344 where
345 T: DeserializeOwned,
346 V: Serialize,
347 {
348 match self.client {
349 ClientKind::Bus(ref c) => {
350 let payload: busrt::borrow::Cow = if let Some(ref p) = params {
351 pack(p)?.into()
352 } else {
353 busrt::empty_payload!()
354 };
355 let res = tokio::time::timeout(
356 self.config.timeout,
357 c.call(target, method, payload, QoS::Processed),
358 )
359 .await??;
360 let result = res.payload();
361 if result.is_empty() {
362 Ok(T::deserialize(Value::Unit)?)
363 } else {
364 Ok(unpack(result)?)
365 }
366 }
367 ClientKind::Http(ref client) => {
368 let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
369 let params_payload = if let Some(p) = params {
370 Some(to_value(p)?)
371 } else {
372 None
373 };
374 if let Some(token) = to {
375 match self
376 .safe_http_call(
377 client,
378 Some(&token),
379 Some(target),
380 method,
381 params_payload.clone(),
382 )
383 .await
384 {
385 Err(e)
386 if !self.token_preassigned
387 && e.kind() == ErrorKind::AccessDenied
388 && (e.message() == Some("invalid token")) =>
389 {
390 let token = self.http_login(client).await?;
392 self.safe_http_call(
393 client,
394 Some(&token),
395 Some(target),
396 method,
397 params_payload,
398 )
399 .await
400 }
401 res => res,
402 }
403 } else {
404 let token = self.http_login(client).await?;
405 self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
406 .await
407 }
408 }
409 }
410 }
411 async fn safe_http_call<T>(
412 &self,
413 client: &HttpClient,
414 token: Option<&str>,
415 target: Option<&str>,
416 method: &str,
417 params: Option<Value>,
418 ) -> EResult<T>
419 where
420 T: DeserializeOwned,
421 {
422 tokio::time::timeout(
423 self.config.timeout,
424 self.http_call(client, token, target, method, params),
425 )
426 .await?
427 }
428 async fn http_call<T>(
429 &self,
430 client: &HttpClient,
431 token: Option<&str>,
432 target: Option<&str>,
433 method: &str,
434 params: Option<Value>,
435 ) -> EResult<T>
436 where
437 T: DeserializeOwned,
438 {
439 macro_rules! params_map {
440 ($map: expr, $token: expr) => {{
441 $map.insert(
442 Value::String("k".to_owned()),
443 Value::String($token.to_owned()),
444 );
445 Some(Value::Map($map))
446 }};
447 }
448 let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
449 let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
450 let request = JsonRpcRequest::new(
451 Some(Value::U32(id)),
452 if let Some(ref m) = bus_method {
453 m
454 } else {
455 method
456 },
457 if let Some(tk) = token {
458 if let Some(par) = params {
459 let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
460 params_map!(p_map, tk)
461 } else {
462 let mut p_map = BTreeMap::new();
463 params_map!(p_map, tk)
464 }
465 } else {
466 params
467 },
468 rjrpc::Encoding::MsgPack,
469 );
470 let http_request = Request::builder()
471 .method(Method::POST)
472 .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
473 .uri(&self.path)
474 .body(Body::from(request.pack().map_err(Error::invalid_data)?))
475 .map_err(Error::io)?;
476 let http_res = client.request(http_request).await.map_err(Error::io)?;
477 let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
478 let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
479 .map_err(Error::invalid_data)?;
480 if u32::try_from(res.id)? == id {
481 if let Some(err) = res.error {
482 Err(Error::newc(err.code.into(), err.message))
483 } else if let Some(result) = res.result {
484 Ok(T::deserialize(result).map_err(Error::invalid_data)?)
485 } else {
486 Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
487 }
488 } else {
489 Err(Error::io("invalid JRPC response: id mismatch"))
490 }
491 }
492}
493
494enum ClientKind {
495 Bus(rpc::RpcClient),
496 Http(HttpClient),
497}