1use busrt::{
2 ipc,
3 rpc::{self, Rpc},
4 QoS,
5};
6use eva_common::events::NodeInfo;
7use eva_common::payload::{pack, unpack};
8use eva_common::prelude::*;
9use hyper::{client::HttpConnector, Body, Method, Request};
10use hyper_tls::HttpsConnector;
11use rjrpc::{JsonRpcRequest, JsonRpcResponse};
12use serde::{de::DeserializeOwned, Deserialize, Serialize};
13use std::collections::BTreeMap;
14use std::fmt;
15use std::sync::atomic;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19pub type NodeMap = BTreeMap<String, String>;
20
21pub type HttpClient = hyper::Client<HttpsConnector<HttpConnector>>;
22
23static CLIENT_ITERATION: atomic::AtomicUsize = atomic::AtomicUsize::new(1);
24const CT_HEADER: &str = "application/msgpack";
25
26pub const LOCAL: &str = ".local";
27
28#[derive(Deserialize)]
29pub struct SystemInfo {
30 pub system_name: String,
31 pub active: bool,
32 #[serde(flatten)]
33 pub ver: VersionInfo,
34}
35
36#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
37pub struct VersionInfo {
38 pub build: u64,
39 pub version: String,
40}
41
42impl fmt::Display for VersionInfo {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "{} ({})", self.build, self.version)
45 }
46}
47
48impl From<NodeInfo> for VersionInfo {
49 fn from(ni: NodeInfo) -> Self {
50 Self {
51 build: ni.build,
52 version: ni.version,
53 }
54 }
55}
56
57#[inline]
58fn parse_major(ver: &str) -> EResult<u16> {
59 ver.split('.').next().unwrap().parse().map_err(Into::into)
60}
61
62impl VersionInfo {
63 #[inline]
64 pub fn major_matches(&self, ver: &str) -> EResult<bool> {
65 Ok(parse_major(ver)? == self.major()?)
66 }
67 #[inline]
68 pub fn major(&self) -> EResult<u16> {
69 parse_major(&self.version)
70 }
71}
72
73#[allow(clippy::module_name_repetitions)]
74pub struct EvaCloudClient {
75 system_name: String,
76 client: Arc<EvaClient>,
77 node_map: NodeMap,
78}
79
80impl EvaCloudClient {
81 pub fn from_eva_client(system_name: &str, client: EvaClient, node_map: NodeMap) -> Self {
83 Self {
84 system_name: system_name.to_owned(),
85 client: Arc::new(client),
86 node_map,
87 }
88 }
89 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
91 let local_client = EvaClient::connect(path, base_name, config).await?;
92 local_client.transform_into_cloud_client().await
93 }
94 pub async fn get_system_info(&self, node: &str) -> EResult<SystemInfo> {
95 let info: SystemInfo = self.call0(node, "eva.core", "test").await?;
96 Ok(info)
97 }
98 pub async fn call_local0<T>(&self, target: &str, method: &str) -> EResult<T>
99 where
100 T: DeserializeOwned,
101 {
102 self.rpc_call(LOCAL, target, method, None).await
103 }
104 pub async fn call_local<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
105 where
106 T: DeserializeOwned,
107 V: Serialize,
108 {
109 self.rpc_call(LOCAL, target, method, Some(to_value(params)?))
110 .await
111 }
112 pub async fn call0<T>(&self, node: &str, target: &str, method: &str) -> EResult<T>
113 where
114 T: DeserializeOwned,
115 {
116 self.rpc_call(node, target, method, None).await
117 }
118 pub async fn call<T, V>(&self, node: &str, target: &str, method: &str, params: V) -> EResult<T>
119 where
120 T: DeserializeOwned,
121 V: Serialize,
122 {
123 self.rpc_call(node, target, method, Some(to_value(params)?))
124 .await
125 }
126 pub async fn rpc_call<T>(
127 &self,
128 node: &str,
129 target: &str,
130 method: &str,
131 params: Option<Value>,
132 ) -> EResult<T>
133 where
134 T: DeserializeOwned,
135 {
136 if node == ".local" || node == self.system_name {
137 self.client.call(target, method, params).await
138 } else {
139 let mut repl_params: BTreeMap<Value, Value> = if let Some(p) = params {
140 BTreeMap::deserialize(p).map_err(Error::invalid_data)?
141 } else {
142 BTreeMap::new()
143 };
144 repl_params.insert(
145 Value::String("node".to_owned()),
146 Value::String(node.to_owned()),
147 );
148 self.client
149 .call(
150 self.node_map.get(node).ok_or_else(|| {
151 Error::failed(format!("no replication service mapped for {}", node))
152 })?,
153 &format!("bus::{}::{}", target, method),
154 Some(to_value(repl_params)?),
155 )
156 .await
157 }
158 }
159 #[inline]
160 pub fn client(&self) -> &EvaClient {
161 &self.client
162 }
163 #[inline]
164 pub fn client_cloned(&self) -> Arc<EvaClient> {
165 self.client.clone()
166 }
167}
168
169#[derive(Debug, Clone)]
170pub struct Config {
171 credentials: Option<(String, String)>,
172 timeout: Duration,
173}
174
175impl Config {
176 #[inline]
177 pub fn new() -> Self {
178 Self::default()
179 }
180 #[inline]
181 pub fn credentials(mut self, login: &str, password: &str) -> Self {
182 self.credentials = Some((login.to_owned(), password.to_owned()));
183 self
184 }
185 #[inline]
186 pub fn timeout(mut self, timeout: Duration) -> Self {
187 self.timeout = timeout;
188 self
189 }
190}
191
192impl Default for Config {
193 #[inline]
194 fn default() -> Self {
195 Self {
196 credentials: None,
197 timeout: eva_common::DEFAULT_TIMEOUT,
198 }
199 }
200}
201
202#[allow(clippy::module_name_repetitions)]
203pub struct EvaClient {
204 name: String,
205 client: ClientKind,
206 config: Config,
207 token: Mutex<Option<Arc<String>>>,
208 path: String,
209 request_id: atomic::AtomicU32,
210}
211
212impl EvaClient {
213 pub async fn connect(path: &str, base_name: &str, config: Config) -> EResult<Self> {
214 if path.starts_with("http://") || path.starts_with("https://") {
215 let https = HttpsConnector::new();
216 let http_client: hyper::Client<_> = hyper::Client::builder()
217 .pool_idle_timeout(config.timeout)
218 .build(https);
219 let cl = Self {
220 name: base_name.to_owned(),
221 client: ClientKind::Http(http_client),
222 config,
223 token: <_>::default(),
224 path: path.to_owned(),
225 request_id: atomic::AtomicU32::new(0),
226 };
227 if let ClientKind::Http(ref client) = cl.client {
228 cl.http_login(client).await?;
229 }
230 Ok(cl)
231 } else {
232 let name = format!(
233 "{}.{}.{}",
234 base_name,
235 std::process::id(),
236 CLIENT_ITERATION.fetch_add(1, atomic::Ordering::SeqCst)
237 );
238 let bus = tokio::time::timeout(
239 config.timeout,
240 ipc::Client::connect(&ipc::Config::new(path, &name)),
241 )
242 .await??;
243 let rpc = rpc::RpcClient::new(bus, rpc::DummyHandlers {});
244 Ok(Self {
245 name,
246 client: ClientKind::Bus(rpc),
247 config,
248 token: <_>::default(),
249 path: path.to_owned(),
250 request_id: atomic::AtomicU32::new(0),
251 })
252 }
253 }
254 #[inline]
255 pub fn name(&self) -> &str {
256 &self.name
257 }
258 pub async fn get_system_info(&self) -> EResult<SystemInfo> {
259 let info: SystemInfo = self.call0("eva.core", "test").await?;
260 Ok(info)
261 }
262 async fn http_login(&self, client: &HttpClient) -> EResult<Arc<String>> {
263 #[derive(Serialize)]
264 struct LoginParams<'a> {
265 u: &'a str,
266 p: &'a str,
267 }
268 #[derive(Deserialize)]
269 struct LoginPayload {
270 token: String,
271 }
272 if let Some(ref creds) = self.config.credentials {
273 let p: LoginPayload = self
274 .safe_http_call(
275 client,
276 None,
277 None,
278 "login",
279 Some(to_value(LoginParams {
280 u: &creds.0,
281 p: &creds.1,
282 })?),
283 )
284 .await?;
285 let token = Arc::new(p.token);
286 self.token.lock().unwrap().replace(token.clone());
287 Ok(token)
288 } else {
289 Err(Error::access("no credentials set"))
290 }
291 }
292 pub async fn transform_into_cloud_client(self) -> EResult<EvaCloudClient> {
293 #[derive(Deserialize)]
294 struct NodeList {
295 name: String,
296 svc: Option<String>,
297 }
298 let system_name = self.get_system_info().await?.system_name;
299 let node_list: Vec<NodeList> = self.call0("eva.core", "node.list").await?;
300 let node_map: NodeMap = node_list
301 .into_iter()
302 .filter_map(|v| v.svc.map(|s| (v.name, s)))
303 .collect();
304 Ok(EvaCloudClient::from_eva_client(
305 &system_name,
306 self,
307 node_map,
308 ))
309 }
310 pub async fn call0<T>(&self, target: &str, method: &str) -> EResult<T>
311 where
312 T: DeserializeOwned,
313 {
314 self.rpc_call(target, method, None::<()>).await
315 }
316 pub async fn call<T, V>(&self, target: &str, method: &str, params: V) -> EResult<T>
317 where
318 T: DeserializeOwned,
319 V: Serialize,
320 {
321 self.rpc_call(target, method, Some(params)).await
322 }
323 pub async fn rpc_call<T, V>(&self, target: &str, method: &str, params: Option<V>) -> EResult<T>
327 where
328 T: DeserializeOwned,
329 V: Serialize,
330 {
331 match self.client {
332 ClientKind::Bus(ref c) => {
333 let payload: busrt::borrow::Cow = if let Some(ref p) = params {
334 pack(p)?.into()
335 } else {
336 busrt::empty_payload!()
337 };
338 let res = tokio::time::timeout(
339 self.config.timeout,
340 c.call(target, method, payload, QoS::Processed),
341 )
342 .await??;
343 let result = res.payload();
344 if result.is_empty() {
345 Ok(T::deserialize(Value::Unit)?)
346 } else {
347 Ok(unpack(result)?)
348 }
349 }
350 ClientKind::Http(ref client) => {
351 let to: Option<Arc<String>> = self.token.lock().unwrap().clone();
352 let params_payload = to_value(params)?;
353 if let Some(token) = to {
354 match self
355 .safe_http_call(
356 client,
357 Some(&token),
358 Some(target),
359 method,
360 Some(params_payload.clone()),
361 )
362 .await
363 {
364 Err(e)
365 if e.kind() == ErrorKind::AccessDenied
366 && e.message().map_or(false, |m| m == "invalid token") =>
367 {
368 let token = self.http_login(client).await?;
370 self.safe_http_call(
371 client,
372 Some(&token),
373 Some(target),
374 method,
375 Some(params_payload),
376 )
377 .await
378 }
379 res => res,
380 }
381 } else {
382 let token = self.http_login(client).await?;
383 self.safe_http_call(
384 client,
385 Some(&token),
386 Some(target),
387 method,
388 Some(params_payload),
389 )
390 .await
391 }
392 }
393 }
394 }
395 async fn safe_http_call<T>(
396 &self,
397 client: &HttpClient,
398 token: Option<&str>,
399 target: Option<&str>,
400 method: &str,
401 params: Option<Value>,
402 ) -> EResult<T>
403 where
404 T: DeserializeOwned,
405 {
406 tokio::time::timeout(
407 self.config.timeout,
408 self.http_call(client, token, target, method, params),
409 )
410 .await?
411 }
412 async fn http_call<T>(
413 &self,
414 client: &HttpClient,
415 token: Option<&str>,
416 target: Option<&str>,
417 method: &str,
418 params: Option<Value>,
419 ) -> EResult<T>
420 where
421 T: DeserializeOwned,
422 {
423 macro_rules! params_map {
424 ($map: expr, $token: expr) => {{
425 $map.insert(
426 Value::String("k".to_owned()),
427 Value::String($token.to_owned()),
428 );
429 Some(Value::Map($map))
430 }};
431 }
432 let id = self.request_id.fetch_add(1, atomic::Ordering::SeqCst);
433 let bus_method = target.map(|tgt| format!("bus::{tgt}::{method}"));
434 let request = JsonRpcRequest::new(
435 Some(Value::U32(id)),
436 if let Some(ref m) = bus_method {
437 m
438 } else {
439 method
440 },
441 if let Some(tk) = token {
442 if let Some(par) = params {
443 let mut p_map: BTreeMap<Value, Value> = BTreeMap::deserialize(par)?;
444 params_map!(p_map, tk)
445 } else {
446 let mut p_map = BTreeMap::new();
447 params_map!(p_map, tk)
448 }
449 } else {
450 params
451 },
452 rjrpc::Encoding::MsgPack,
453 );
454 let http_request = Request::builder()
455 .method(Method::POST)
456 .header(hyper::header::CONTENT_TYPE, CT_HEADER.to_owned())
457 .uri(&self.path)
458 .body(Body::from(request.pack().map_err(Error::invalid_data)?))
459 .map_err(Error::io)?;
460 let http_res = client.request(http_request).await.map_err(Error::io)?;
461 let http_res_body = hyper::body::to_bytes(http_res).await.map_err(Error::io)?;
462 let res = JsonRpcResponse::unpack(&http_res_body, rjrpc::Encoding::MsgPack)
463 .map_err(Error::invalid_data)?;
464 if u32::try_from(res.id)? == id {
465 if let Some(err) = res.error {
466 Err(Error::newc(err.code.into(), err.message))
467 } else if let Some(result) = res.result {
468 Ok(T::deserialize(result).map_err(Error::invalid_data)?)
469 } else {
470 Ok(T::deserialize(Value::Unit).map_err(Error::invalid_data)?)
471 }
472 } else {
473 Err(Error::io("invalid JRPC response: id mismatch"))
474 }
475 }
476}
477
478enum ClientKind {
479 Bus(rpc::RpcClient),
480 Http(HttpClient),
481}