1use busrt::{
2 QoS, ipc,
3 rpc::{self, Rpc},
4};
5use eva_common::events::NodeInfo;
6use eva_common::payload::{pack, unpack};
7use eva_common::prelude::*;
8use hyper::{Body, Method, Request, client::HttpConnector};
9use hyper_tls::HttpsConnector;
10use rjrpc::{JsonRpcRequest, JsonRpcResponse};
11use serde::{Deserialize, Serialize, de::DeserializeOwned};
12use std::collections::BTreeMap;
13use std::fmt;
14use std::sync::atomic;
15use std::sync::{Arc, Mutex};
16use std::time::Duration;
17
18pub type NodeMap = BTreeMap<String, String>;
19
20pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
21
22static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
23const CT_HEADER: &str = "application/msgpack";
24
25pub const LOCAL: &str = ".local";
26
27#[derive(Deserialize)]
28pub struct SystemInfo {
29 pub system_name: String,
30 pub active: bool,
31 #[serde(flatten)]
32 pub ver: VersionInfo,
33}
34
35#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
36pub struct VersionInfo {
37 pub build: u64,
38 pub version: String,
39}
40
41impl fmt::Display for VersionInfo {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 write!(f, "{} ({})", self.build, self.version)
44 }
45}
46
47impl From<NodeInfo> for VersionInfo {
48 fn from(ni: NodeInfo) -> Self {
49 Self {
50 build: ni.build,
51 version: ni.version,
52 }
53 }
54}
55
56#[inline]
57fn parse_major(ver: &str) -> EResult<u16> {
58 ver.split('.').next().unwrap().parse().map_err(Into::into)
59}
60
61impl VersionInfo {
62 #[inline]
63 pub fn major_matches(&self, ver: &str) -> EResult<bool> {
64 Ok(parse_major(ver)? == self.major()?)
65 }
66 #[inline]
67 pub fn major(&self) -> EResult<u16> {
68 parse_major(&self.version)
69 }
70}
71
72#[allow(clippy::module_name_repetitions)]
73pub struct EvaCloudClient {
74 system_name: String,
75 client: Arc<EvaClient>,
76 node_map: NodeMap,
77}
78
79impl EvaCloudClient {
80 pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
82 Self {
83 system_name: system_name.to_owned(),
84 client: Arc::new(client),
85 node_map,
86 }
87 }
88 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
90 let local_client = EvaClient::connect(path, base_name, config).await?;
91 local_client.transform_into_cloud_client().await
92 }
93 pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
94 let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
95 Ok(info)
96 }
97 pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
98 where
99 T: DeserializeOwned,
100 {
101 self.rpc_call(LOCAL, target, method, None).await
102 }
103 pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
104 where
105 T: DeserializeOwned,
106 V: Serialize,
107 {
108 self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
109 .await
110 }
111 pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
112 where
113 T: DeserializeOwned,
114 {
115 self.rpc_call(node, target, method, None).await
116 }
117 pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
118 where
119 T: DeserializeOwned,
120 V: Serialize,
121 {
122 self.rpc_call(node, target, method, Some(to_value(params)?))
123 .await
124 }
125 pub async fn rpc_call<T>(
126 &self,
127 node: &str,
128 target: &str,
129 method: &str,
130 params: Option<Value>,
131 ) -> EResult<T>
132 where
133 T: DeserializeOwned,
134 {
135 if node == ".local" || node == self.system_name {
136 self.client.call(target, method, params).await
137 } else {
138 let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
139 BTreeMap::deserialize(p).map_err(Error::invalid_data)?
140 } else {
141 BTreeMap::new()
142 };
143 repl_params.insert(
144 Value::String("node".to_owned()),
145 Value::String(node.to_owned()),
146 );
147 self.client
148 .call(
149 self.node_map.get(node).ok_or_else(|| {
150 Error::failed(format!("no replication service mapped for {}", node))
151 })?,
152 &format!("bus::{}::{}", target, method),
153 Some(to_value(repl_params)?),
154 )
155 .await
156 }
157 }
158 #[inline]
159 pub fn client(&self) -> &EvaClient {
160 &self.client
161 }
162 #[inline]
163 pub fn client_cloned(&self) -> Arc<EvaClient> {
164 self.client.clone()
165 }
166}
167
168#[derive(Debug, Clone)]
169pub struct Config {
170 credentials: Option<(String, String)>,
171 token: Option<String>,
172 timeout: Duration,
173}
174
175impl Config {
176 #[inline]
177 pub fn new() -> Self {
178 Self::default()
179 }
180 #[inline]
182 pub fn token(mut self, token: &str) -> Self {
183 self.token = Some(token.to_owned());
184 self
185 }
186 #[inline]
188 pub fn credentials(mut self, login: &str, password: &str) -> Self {
189 self.credentials = Some((login.to_owned(), password.to_owned()));
190 self
191 }
192 #[inline]
193 pub fn timeout(mut self, timeout: Duration) -> Self {
194 self.timeout = timeout;
195 self
196 }
197}
198
199impl Default for Config {
200 #[inline]
201 fn default() -> Self {
202 Self {
203 credentials: None,
204 token: None,
205 timeout: eva_common::DEFAULT_TIMEOUT,
206 }
207 }
208}
209
210#[allow(clippy::module_name_repetitions)]
211pub struct EvaClient {
212 name: String,
213 client: ClientKind,
214 config: Config,
215 token: Mutex<Option<Arc<String>>>,
216 token_preassigned: bool,
217 path: String,
218 request_id: atomic::AtomicU32,
219}
220
221impl EvaClient {
222 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
223 if path.starts_with("http://") || path.starts_with("https://") {
224 let https = HttpsConnector::new();
225 let http_client: hyper::Client<_> = hyper::Client::builder()
226 .pool_idle_timeout(config.timeout)
227 .build(https);
228 let token = config.token.clone();
229 let has_token = token.is_some();
230 let cl = Self {
231 name: base_name.to_owned(),
232 client: ClientKind::Http(http_client),
233 config,
234 token: Mutex::new(token.map(Arc::new)),
235 token_preassigned: has_token,
236 path: path.to_owned(),
237 request_id: atomic::AtomicU32::new(0),
238 };
239 if !has_token && let ClientKind::Http(ref client) = cl.client {
240 cl.http_login(client).await?;
241 }
242 Ok(cl)
243 } else {
244 let name = format!(
245 "{}.{}.{}",
246 base_name,
247 std::process::id(),
248 CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
249 );
250 let bus = ipc::Client::connect(&ipc::Config::new(path, &name).timeout(config.timeout))
251 .await?;
252 let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
253 Ok(Self {
254 name,
255 client: ClientKind::Bus(rpc),
256 config,
257 token: <_>::default(),
258 token_preassigned: false,
259 path: path.to_owned(),
260 request_id: atomic::AtomicU32::new(0),
261 })
262 }
263 }
264 #[inline]
265 pub fn name(&self) -> &str {
266 &self.name
267 }
268 pub async fn get_system_info(&self) -> EResult<SystemInfo> {
269 let info: SystemInfo = self.call0("eva.core", "test").await?;
270 Ok(info)
271 }
272 async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
273 #[derive(Serialize)]
274 struct LoginParams<'a> {
275 u: &'a str,
276 p: &'a str,
277 }
278 #[derive(Deserialize)]
279 struct LoginPayload {
280 token: String,
281 }
282 if let Some(ref creds) = self.config.credentials {
283 let p: LoginPayload = self
284 .safe_http_call(
285 client,
286 None,
287 None,
288 "login",
289 Some(to_value(LoginParams {
290 u: &creds.0,
291 p: &creds.1,
292 })?),
293 )
294 .await?;
295 let token = Arc::new(p.token);
296 self.token.lock().unwrap().replace(token.clone());
297 Ok(token)
298 } else {
299 Err(Error::access("no credentials set"))
300 }
301 }
302 pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
303 #[derive(Deserialize)]
304 struct NodeList {
305 name: String,
306 svc: Option<String>,
307 }
308 let system_name = self.get_system_info().await?.system_name;
309 let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
310 let node_map: NodeMap = node_list
311 .into_iter()
312 .filter_map(|v| v.svc.map(|s| (v.name, s)))
313 .collect();
314 Ok(EvaCloudClient::from_eva_client(
315 &system_name,
316 self,
317 node_map,
318 ))
319 }
320 pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
321 where
322 T: DeserializeOwned,
323 {
324 self.rpc_call(target, method, None::<()>).await
325 }
326 pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
327 where
328 T: DeserializeOwned,
329 V: Serialize,
330 {
331 self.rpc_call(target, method, Some(params)).await
332 }
333 pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
337 where
338 T: DeserializeOwned,
339 V: Serialize,
340 {
341 match self.client {
342 ClientKind::Bus(ref c) => {
343 let payload: busrt::borrow::Cow = if let Some(ref p) = params {
344 pack(p)?.into()
345 } else {
346 busrt::empty_payload!()
347 };
348 let res = tokio::time::timeout(
349 self.config.timeout,
350 c.call(target, method, payload, QoS::Processed),
351 )
352 .await??;
353 let result = res.payload();
354 if result.is_empty() {
355 Ok(T::deserialize(Value::Unit)?)
356 } else {
357 Ok(unpack(result)?)
358 }
359 }
360 ClientKind::Http(ref client) => {
361 let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
362 let params_payload = if let Some(p) = params {
363 Some(to_value(p)?)
364 } else {
365 None
366 };
367 if let Some(token) = to {
368 match self
369 .safe_http_call(
370 client,
371 Some(&token),
372 Some(target),
373 method,
374 params_payload.clone(),
375 )
376 .await
377 {
378 Err(e)
379 if !self.token_preassigned
380 && e.kind() == ErrorKind::AccessDenied
381 && (e.message() == Some("invalid token")) =>
382 {
383 let token = self.http_login(client).await?;
385 self.safe_http_call(
386 client,
387 Some(&token),
388 Some(target),
389 method,
390 params_payload,
391 )
392 .await
393 }
394 res => res,
395 }
396 } else {
397 let token = self.http_login(client).await?;
398 self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
399 .await
400 }
401 }
402 }
403 }
404 async fn safe_http_call<T>(
405 &self,
406 client: &HttpClient,
407 token: Option<&str>,
408 target: Option<&str>,
409 method: &str,
410 params: Option<Value>,
411 ) -> EResult<T>
412 where
413 T: DeserializeOwned,
414 {
415 tokio::time::timeout(
416 self.config.timeout,
417 self.http_call(client, token, target, method, params),
418 )
419 .await?
420 }
421 async fn http_call<T>(
422 &self,
423 client: &HttpClient,
424 token: Option<&str>,
425 target: Option<&str>,
426 method: &str,
427 params: Option<Value>,
428 ) -> EResult<T>
429 where
430 T: DeserializeOwned,
431 {
432 macro_rules! params_map {
433 ($map: expr, $token: expr) => {{
434 $map.insert(
435 Value::String("k".to_owned()),
436 Value::String($token.to_owned()),
437 );
438 Some(Value::Map($map))
439 }};
440 }
441 let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
442 let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
443 let request = JsonRpcRequest::new(
444 Some(Value::U32(id)),
445 if let Some(ref m) = bus_method {
446 m
447 } else {
448 method
449 },
450 if let Some(tk) = token {
451 if let Some(par) = params {
452 let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
453 params_map!(p_map, tk)
454 } else {
455 let mut p_map = BTreeMap::new();
456 params_map!(p_map, tk)
457 }
458 } else {
459 params
460 },
461 rjrpc::Encoding::MsgPack,
462 );
463 let http_request = Request::builder()
464 .method(Method::POST)
465 .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
466 .uri(&self.path)
467 .body(Body::from(request.pack().map_err(Error::invalid_data)?))
468 .map_err(Error::io)?;
469 let http_res = client.request(http_request).await.map_err(Error::io)?;
470 let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
471 let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
472 .map_err(Error::invalid_data)?;
473 if u32::try_from(res.id)? == id {
474 if let Some(err) = res.error {
475 Err(Error::newc(err.code.into(), err.message))
476 } else if let Some(result) = res.result {
477 Ok(T::deserialize(result).map_err(Error::invalid_data)?)
478 } else {
479 Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
480 }
481 } else {
482 Err(Error::io("invalid JRPC response: id mismatch"))
483 }
484 }
485}
486
487enum ClientKind {
488 Bus(rpc::RpcClient),
489 Http(HttpClient),
490}