1use busrt::{
2 ipc,
3 rpc::{self, Rpc},
4 QoS,
5};
6use eva_common::events::NodeInfo;
7use eva_common::payload::{pack, unpack};
8use eva_common::prelude::*;
9use hyper::{client::HttpConnector, Body, Method, Request};
10use hyper_tls::HttpsConnector;
11use rjrpc::{JsonRpcRequest, JsonRpcResponse};
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use std::collections::BTreeMap;
14use std::fmt;
15use std::sync::atomic;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19pub type NodeMap = BTreeMap<String, String>;
20
21pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
22
23static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
24const CT_HEADER: &str = "application/msgpack";
25
26pub const LOCAL: &str = ".local";
27
28#[derive(Deserialize)]
29pub struct SystemInfo {
30 pub system_name: String,
31 pub active: bool,
32 #[serde(flatten)]
33 pub ver: VersionInfo,
34}
35
36#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
37pub struct VersionInfo {
38 pub build: u64,
39 pub version: String,
40}
41
42impl fmt::Display for VersionInfo {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "{} ({})", self.build, self.version)
45 }
46}
47
48impl From<NodeInfo> for VersionInfo {
49 fn from(ni: NodeInfo) -> Self {
50 Self {
51 build: ni.build,
52 version: ni.version,
53 }
54 }
55}
56
57#[inline]
58fn parse_major(ver: &str) -> EResult<u16> {
59 ver.split('.').next().unwrap().parse().map_err(Into::into)
60}
61
62impl VersionInfo {
63 #[inline]
64 pub fn major_matches(&self, ver: &str) -> EResult<bool> {
65 Ok(parse_major(ver)? == self.major()?)
66 }
67 #[inline]
68 pub fn major(&self) -> EResult<u16> {
69 parse_major(&self.version)
70 }
71}
72
73#[allow(clippy::module_name_repetitions)]
74pub struct EvaCloudClient {
75 system_name: String,
76 client: Arc<EvaClient>,
77 node_map: NodeMap,
78}
79
80impl EvaCloudClient {
81 pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
83 Self {
84 system_name: system_name.to_owned(),
85 client: Arc::new(client),
86 node_map,
87 }
88 }
89 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
91 let local_client = EvaClient::connect(path, base_name, config).await?;
92 local_client.transform_into_cloud_client().await
93 }
94 pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
95 let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
96 Ok(info)
97 }
98 pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
99 where
100 T: DeserializeOwned,
101 {
102 self.rpc_call(LOCAL, target, method, None).await
103 }
104 pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
105 where
106 T: DeserializeOwned,
107 V: Serialize,
108 {
109 self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
110 .await
111 }
112 pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
113 where
114 T: DeserializeOwned,
115 {
116 self.rpc_call(node, target, method, None).await
117 }
118 pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
119 where
120 T: DeserializeOwned,
121 V: Serialize,
122 {
123 self.rpc_call(node, target, method, Some(to_value(params)?))
124 .await
125 }
126 pub async fn rpc_call<T>(
127 &self,
128 node: &str,
129 target: &str,
130 method: &str,
131 params: Option<Value>,
132 ) -> EResult<T>
133 where
134 T: DeserializeOwned,
135 {
136 if node == ".local" || node == self.system_name {
137 self.client.call(target, method, params).await
138 } else {
139 let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
140 BTreeMap::deserialize(p).map_err(Error::invalid_data)?
141 } else {
142 BTreeMap::new()
143 };
144 repl_params.insert(
145 Value::String("node".to_owned()),
146 Value::String(node.to_owned()),
147 );
148 self.client
149 .call(
150 self.node_map.get(node).ok_or_else(|| {
151 Error::failed(format!("no replication service mapped for {}", node))
152 })?,
153 &format!("bus::{}::{}", target, method),
154 Some(to_value(repl_params)?),
155 )
156 .await
157 }
158 }
159 #[inline]
160 pub fn client(&self) -> &EvaClient {
161 &self.client
162 }
163 #[inline]
164 pub fn client_cloned(&self) -> Arc<EvaClient> {
165 self.client.clone()
166 }
167}
168
169#[derive(Debug, Clone)]
170pub struct Config {
171 credentials: Option<(String, String)>,
172 token: Option<String>,
173 timeout: Duration,
174}
175
176impl Config {
177 #[inline]
178 pub fn new() -> Self {
179 Self::default()
180 }
181 #[inline]
183 pub fn token(mut self, token: &str) -> Self {
184 self.token = Some(token.to_owned());
185 self
186 }
187 #[inline]
189 pub fn credentials(mut self, login: &str, password: &str) -> Self {
190 self.credentials = Some((login.to_owned(), password.to_owned()));
191 self
192 }
193 #[inline]
194 pub fn timeout(mut self, timeout: Duration) -> Self {
195 self.timeout = timeout;
196 self
197 }
198}
199
200impl Default for Config {
201 #[inline]
202 fn default() -> Self {
203 Self {
204 credentials: None,
205 token: None,
206 timeout: eva_common::DEFAULT_TIMEOUT,
207 }
208 }
209}
210
211#[allow(clippy::module_name_repetitions)]
212pub struct EvaClient {
213 name: String,
214 client: ClientKind,
215 config: Config,
216 token: Mutex<Option<Arc<String>>>,
217 token_preassigned: bool,
218 path: String,
219 request_id: atomic::AtomicU32,
220}
221
222impl EvaClient {
223 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
224 if path.starts_with("http://") || path.starts_with("https://") {
225 let https = HttpsConnector::new();
226 let http_client: hyper::Client<_> = hyper::Client::builder()
227 .pool_idle_timeout(config.timeout)
228 .build(https);
229 let token = config.token.clone();
230 let has_token = token.is_some();
231 let cl = Self {
232 name: base_name.to_owned(),
233 client: ClientKind::Http(http_client),
234 config,
235 token: Mutex::new(token.map(Arc::new)),
236 token_preassigned: has_token,
237 path: path.to_owned(),
238 request_id: atomic::AtomicU32::new(0),
239 };
240 if !has_token {
241 if let ClientKind::Http(ref client) = cl.client {
242 cl.http_login(client).await?;
243 }
244 }
245 Ok(cl)
246 } else {
247 let name = format!(
248 "{}.{}.{}",
249 base_name,
250 std::process::id(),
251 CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
252 );
253 let bus = ipc::Client::connect(&ipc::Config::new(path, &name).timeout(config.timeout))
254 .await?;
255 let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
256 Ok(Self {
257 name,
258 client: ClientKind::Bus(rpc),
259 config,
260 token: <_>::default(),
261 token_preassigned: false,
262 path: path.to_owned(),
263 request_id: atomic::AtomicU32::new(0),
264 })
265 }
266 }
267 #[inline]
268 pub fn name(&self) -> &str {
269 &self.name
270 }
271 pub async fn get_system_info(&self) -> EResult<SystemInfo> {
272 let info: SystemInfo = self.call0("eva.core", "test").await?;
273 Ok(info)
274 }
275 async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
276 #[derive(Serialize)]
277 struct LoginParams<'a> {
278 u: &'a str,
279 p: &'a str,
280 }
281 #[derive(Deserialize)]
282 struct LoginPayload {
283 token: String,
284 }
285 if let Some(ref creds) = self.config.credentials {
286 let p: LoginPayload = self
287 .safe_http_call(
288 client,
289 None,
290 None,
291 "login",
292 Some(to_value(LoginParams {
293 u: &creds.0,
294 p: &creds.1,
295 })?),
296 )
297 .await?;
298 let token = Arc::new(p.token);
299 self.token.lock().unwrap().replace(token.clone());
300 Ok(token)
301 } else {
302 Err(Error::access("no credentials set"))
303 }
304 }
305 pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
306 #[derive(Deserialize)]
307 struct NodeList {
308 name: String,
309 svc: Option<String>,
310 }
311 let system_name = self.get_system_info().await?.system_name;
312 let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
313 let node_map: NodeMap = node_list
314 .into_iter()
315 .filter_map(|v| v.svc.map(|s| (v.name, s)))
316 .collect();
317 Ok(EvaCloudClient::from_eva_client(
318 &system_name,
319 self,
320 node_map,
321 ))
322 }
323 pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
324 where
325 T: DeserializeOwned,
326 {
327 self.rpc_call(target, method, None::<()>).await
328 }
329 pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
330 where
331 T: DeserializeOwned,
332 V: Serialize,
333 {
334 self.rpc_call(target, method, Some(params)).await
335 }
336 pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
340 where
341 T: DeserializeOwned,
342 V: Serialize,
343 {
344 match self.client {
345 ClientKind::Bus(ref c) => {
346 let payload: busrt::borrow::Cow = if let Some(ref p) = params {
347 pack(p)?.into()
348 } else {
349 busrt::empty_payload!()
350 };
351 let res = tokio::time::timeout(
352 self.config.timeout,
353 c.call(target, method, payload, QoS::Processed),
354 )
355 .await??;
356 let result = res.payload();
357 if result.is_empty() {
358 Ok(T::deserialize(Value::Unit)?)
359 } else {
360 Ok(unpack(result)?)
361 }
362 }
363 ClientKind::Http(ref client) => {
364 let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
365 let params_payload = if let Some(p) = params {
366 Some(to_value(p)?)
367 } else {
368 None
369 };
370 if let Some(token) = to {
371 match self
372 .safe_http_call(
373 client,
374 Some(&token),
375 Some(target),
376 method,
377 params_payload.clone(),
378 )
379 .await
380 {
381 Err(e)
382 if !self.token_preassigned
383 && e.kind() == ErrorKind::AccessDenied
384 && (e.message() == Some("invalid token")) =>
385 {
386 let token = self.http_login(client).await?;
388 self.safe_http_call(
389 client,
390 Some(&token),
391 Some(target),
392 method,
393 params_payload,
394 )
395 .await
396 }
397 res => res,
398 }
399 } else {
400 let token = self.http_login(client).await?;
401 self.safe_http_call(client, Some(&token), Some(target), method, params_payload)
402 .await
403 }
404 }
405 }
406 }
407 async fn safe_http_call<T>(
408 &self,
409 client: &HttpClient,
410 token: Option<&str>,
411 target: Option<&str>,
412 method: &str,
413 params: Option<Value>,
414 ) -> EResult<T>
415 where
416 T: DeserializeOwned,
417 {
418 tokio::time::timeout(
419 self.config.timeout,
420 self.http_call(client, token, target, method, params),
421 )
422 .await?
423 }
424 async fn http_call<T>(
425 &self,
426 client: &HttpClient,
427 token: Option<&str>,
428 target: Option<&str>,
429 method: &str,
430 params: Option<Value>,
431 ) -> EResult<T>
432 where
433 T: DeserializeOwned,
434 {
435 macro_rules! params_map {
436 ($map: expr, $token: expr) => {{
437 $map.insert(
438 Value::String("k".to_owned()),
439 Value::String($token.to_owned()),
440 );
441 Some(Value::Map($map))
442 }};
443 }
444 let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
445 let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
446 let request = JsonRpcRequest::new(
447 Some(Value::U32(id)),
448 if let Some(ref m) = bus_method {
449 m
450 } else {
451 method
452 },
453 if let Some(tk) = token {
454 if let Some(par) = params {
455 let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
456 params_map!(p_map, tk)
457 } else {
458 let mut p_map = BTreeMap::new();
459 params_map!(p_map, tk)
460 }
461 } else {
462 params
463 },
464 rjrpc::Encoding::MsgPack,
465 );
466 let http_request = Request::builder()
467 .method(Method::POST)
468 .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
469 .uri(&self.path)
470 .body(Body::from(request.pack().map_err(Error::invalid_data)?))
471 .map_err(Error::io)?;
472 let http_res = client.request(http_request).await.map_err(Error::io)?;
473 let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
474 let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
475 .map_err(Error::invalid_data)?;
476 if u32::try_from(res.id)? == id {
477 if let Some(err) = res.error {
478 Err(Error::newc(err.code.into(), err.message))
479 } else if let Some(result) = res.result {
480 Ok(T::deserialize(result).map_err(Error::invalid_data)?)
481 } else {
482 Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
483 }
484 } else {
485 Err(Error::io("invalid JRPC response: id mismatch"))
486 }
487 }
488}
489
490enum ClientKind {
491 Bus(rpc::RpcClient),
492 Http(HttpClient),
493}