1use std::collections::HashMap;
2use anyhow::Error;
3use serde_derive::{Serialize, Deserialize};
4use serde_json::Value;
5use yew::callback::Callback;
6use yew::prelude::worker::*;
7use yew::services::console::ConsoleService;
8use yew::services::fetch::{self, FetchService, FetchTask};
9use yew::agent::HandlerId;
10use yew::format::Nothing;
11use sp_dto::{Participator, MsgKind, uuid::Uuid, MsgMeta, Route, RouteSpec, CmpSpec, rpc_dto_with_correlation_id, get_msg};
12
13pub struct Worker {
14 link: AgentLink<Worker>,
15 clients: HashMap<String, HandlerId>,
16 fetch_tasks: HashMap<Uuid, FetchTask>
17}
18
19#[derive(Serialize, Deserialize, Debug)]
20pub enum Request {
21 Auth(String),
22 Msg(MsgMeta, Value),
23 Rpc(String, Uuid, Vec<u8>),
24 PostStringRpc(String, Uuid, String),
25 PostBinaryRpc(String, Uuid, Vec<u8>),
26 GetStringRpc(String, Uuid),
27 GetBinaryRpc(String, Uuid)
28}
29
30#[derive(Serialize, Deserialize, Debug)]
31pub enum Response {
32 Msg(MsgMeta, Value),
33 StringRpc(Uuid, String),
34 BinaryRpc(Uuid, Vec<u8>)
35}
36
37pub enum Msg {
38 RpcReady(Vec<u8>, Uuid, String),
39 StringRpcReady(String, Uuid, String),
40 BinaryRpcReady(Vec<u8>, Uuid, String),
41 FetchError(Error)
42}
43
44impl Worker {
45 pub fn get_cb(&self, correlation_id: Uuid, client_addr: String) -> yew::Callback<fetch::Response<Result<Vec<u8>, Error>>> {
46
47 self.link.callback(move |response: fetch::Response<Result<Vec<u8>, Error>>| {
48 let (_, data) = response.into_parts();
49 match data {
50 Ok(data) => Msg::RpcReady(data, correlation_id, client_addr.clone()),
51 Err(err) => Msg::FetchError(err)
52 }
53 })
54 }
55 pub fn get_string_cb(&self, correlation_id: Uuid, client_addr: String) -> yew::Callback<fetch::Response<Result<String, Error>>> {
56
57 self.link.callback(move |response: fetch::Response<Result<String, Error>>| {
58 let (_, data) = response.into_parts();
59 match data {
60 Ok(data) => Msg::StringRpcReady(data, correlation_id, client_addr.clone()),
61 Err(err) => Msg::FetchError(err)
62 }
63 })
64 }
65 pub fn get_binary_cb(&self, correlation_id: Uuid, client_addr: String) -> yew::Callback<fetch::Response<Result<Vec<u8>, Error>>> {
66
67 self.link.callback(move |response: fetch::Response<Result<Vec<u8>, Error>>| {
68 let (_, data) = response.into_parts();
69 match data {
70 Ok(data) => Msg::BinaryRpcReady(data, correlation_id, client_addr.clone()),
71 Err(err) => Msg::FetchError(err)
72 }
73 })
74 }
75}
76
77impl Agent for Worker {
78 type Reach = Context<Self>;
83 type Message = Msg;
84 type Input = Request;
85 type Output = Response;
86 fn create(link: AgentLink<Self>) -> Self {
88 ConsoleService::log("hub created");
89
90 Worker {
91 link,
92 clients: HashMap::new(),
93 fetch_tasks: HashMap::new()
94 }
95 }
96 fn update(&mut self, msg: Self::Message) {
98 match msg {
100 Msg::RpcReady(data, correlation_id, addr) => {
101 let (msg_meta, payload, _) = get_msg::<Value>(&data).expect("failed to get msg on FetchReady");
102 self.fetch_tasks.remove(&correlation_id);
103 match msg_meta.kind {
104 MsgKind::RpcResponse(_) => {
105 match msg_meta.route.spec {
106 RouteSpec::Simple => {
107 match msg_meta.source_cmp_addr() {
108 Some(source_addr) => {
109 match source_addr == addr {
110 true => {
111 match self.clients.get(&addr) {
112 Some(client_id) => self.link.respond(*client_id, Response::Msg(msg_meta, payload)),
113 None => ConsoleService::log(&format!("hub: missing client {}", addr))
114 }
115 }
116 false => {
117 ConsoleService::log(&format!("error: message source addr differ from real source, message not delivered, {} vs {} (real one). This is possible security issue, please note.", source_addr, addr));
118 }
119 }
120 }
121 None => {
122 ConsoleService::log("error: source cmp empty for this rpc");
123 }
124 }
125 }
126 RouteSpec::Client(_) => {
127 match msg_meta.client_cmp_addr() {
128 Some(addr) => {
129 match self.clients.get(&addr) {
130 Some(client_id) => self.link.respond(*client_id, Response::Msg(msg_meta, payload)),
131 None => ConsoleService::log(&format!("hub: missing client {}", addr))
132 }
133 }
134 None => {
135 ConsoleService::log("error: client cmp empty for this rpc");
136 }
137 }
138 }
139 }
140 }
141 _ => {}
142 }
143 }
144 Msg::StringRpcReady(data, correlation_id, addr) => {
145 self.fetch_tasks.remove(&correlation_id);
146 match self.clients.get(&addr) {
147 Some(client_id) => self.link.respond(*client_id, Response::StringRpc(correlation_id, data)),
148 None => ConsoleService::log(&format!("hub: missing client {}", addr))
149 }
150 }
151 Msg::BinaryRpcReady(data, correlation_id, addr) => {
152 self.fetch_tasks.remove(&correlation_id);
153 match self.clients.get(&addr) {
154 Some(client_id) => self.link.respond(*client_id, Response::BinaryRpc(correlation_id, data)),
155 None => ConsoleService::log(&format!("hub: missing client {}", addr))
156 }
157 }
158 Msg::FetchError(err) => {
159 ConsoleService::log(&format!("error: fetch, {:?}", err));
160 }
161 }
162 }
163 fn handle_input(&mut self, msg: Self::Input, who: HandlerId) {
165 match msg {
167 Request::Auth(addr) => {
168 ConsoleService::log(&format!("hub auth: {}", addr));
169 self.clients.insert(addr, who);
170 }
171 Request::Msg(msg_meta, payload) => {
172 match self.clients.get(&msg_meta.rx) {
173 Some(client_id) => self.link.respond(*client_id, Response::Msg(msg_meta, payload)),
174 None => ConsoleService::log(&format!("hub: missing client {}", msg_meta.rx))
175 }
176 }
177 Request::Rpc(url, correlation_id, data) => {
178 match self.clients.iter().find(|(_, x)| **x == who) {
179 Some((addr, _)) => {
180
181 let request = fetch::Request::post(url)
182 .body(Ok(data))
183 .expect("Failed to build request.");
184
185 match FetchService::fetch_binary(request, self.get_cb(correlation_id, addr.clone())) {
186 Ok(task) => {
187 let _ = self.fetch_tasks.insert(correlation_id, task);
188 }
189 Err(e) => ConsoleService::log(&format!("error: error on fetch, {}", e))
190 }
191 }
192 None => {
193 ConsoleService::log("error: client not found by handler id")
194 }
195 }
196 }
197 Request::PostStringRpc(url, correlation_id, data) => {
198 match self.clients.iter().find(|(_, x)| **x == who) {
199 Some((addr, _)) => {
200
201 let request = fetch::Request::post(url)
202 .body(Ok(data))
203 .expect("Failed to build request.");
204
205 match FetchService::fetch(request, self.get_string_cb(correlation_id, addr.clone())) {
206 Ok(task) => {
207 let _ = self.fetch_tasks.insert(correlation_id, task);
208 }
209 Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
210 }
211 }
212 None => {
213 ConsoleService::log("error: client not found by handler id")
214 }
215 }
216 }
217 Request::PostBinaryRpc(url, correlation_id, data) => {
218 match self.clients.iter().find(|(_, x)| **x == who) {
219 Some((addr, _)) => {
220
221 let request = fetch::Request::post(url)
222 .body(Ok(data))
223 .expect("Failed to build request.");
224
225 match FetchService::fetch_binary(request, self.get_binary_cb(correlation_id, addr.clone())) {
226 Ok(task) => {
227 let _ = self.fetch_tasks.insert(correlation_id, task);
228 }
229 Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
230 }
231 }
232 None => {
233 ConsoleService::log("error: client not found by handler id")
234 }
235 }
236 }
237 Request::GetStringRpc(url, correlation_id) => {
238 match self.clients.iter().find(|(_, x)| **x == who) {
239 Some((addr, _)) => {
240
241 let request = fetch::Request::get(url)
242 .body(Nothing)
243 .expect("Failed to build request.");
244
245 match FetchService::fetch(request, self.get_string_cb(correlation_id, addr.clone())) {
246 Ok(task) => {
247 let _ = self.fetch_tasks.insert(correlation_id, task);
248 }
249 Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
250 }
251 }
252 None => {
253 ConsoleService::log("error: client not found by handler id")
254 }
255 }
256 }
257 Request::GetBinaryRpc(url, correlation_id) => {
258 match self.clients.iter().find(|(_, x)| **x == who) {
259 Some((addr, _)) => {
260
261 let request = fetch::Request::post(url)
262 .body(Nothing)
263 .expect("Failed to build request.");
264
265 match FetchService::fetch_binary(request, self.get_binary_cb(correlation_id, addr.clone())) {
266 Ok(task) => {
267 let _ = self.fetch_tasks.insert(correlation_id, task);
268 }
269 Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
270 }
271 }
272 None => {
273 ConsoleService::log("error: client not found by handler id")
274 }
275 }
276 }
277 }
278 }
279}
280
281pub struct Hub {
283 hub: Box<dyn Bridge<Worker>>,
284 pub spec: CmpSpec,
285 pub cfg: HubCfg
286}
287
288#[derive(Clone, PartialEq)]
290pub struct HubCfg {
291 pub app_addr: Option<String>,
292 pub client_addr: Option<String>,
293 pub host: Option<String>,
294 pub fetch_url: Option<String>,
295 pub ws_url: Option<String>,
296 pub domain: Option<String>,
297 pub auth_token: Option<String>,
298 pub auth_data: Option<Value>
299}
300
301impl Default for HubCfg {
302 fn default() -> Self {
303 HubCfg {
304 app_addr: None,
305 client_addr: None,
306 host: None,
307 fetch_url: None,
308 ws_url: None,
309 domain: None,
310 auth_token: None,
311 auth_data: None
312 }
313 }
314}
315
316impl Hub {
317 pub fn new(spec: CmpSpec, cfg: HubCfg, callback: Callback<Response>) -> Hub {
318 let mut hub = Worker::bridge(callback);
319 hub.send(Request::Auth(spec.addr.clone()));
320 Hub {
321 hub,
322 spec,
323 cfg
324 }
325 }
326 pub fn new_no_auth(spec: CmpSpec, cfg: HubCfg, callback: Callback<Response>) -> Hub {
327 Hub {
328 hub: Worker::bridge(callback),
329 spec,
330 cfg
331 }
332 }
333 pub fn auth(&mut self, spec: CmpSpec, cfg: HubCfg) {
334 self.spec = spec;
335 self.cfg = cfg;
336 self.hub.send(Request::Auth(self.spec.addr.clone()));
337 }
338 pub fn rpc(&mut self, addr: &str, key: &str, payload: Value) {
340 let route = Route {
341 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
342 spec: RouteSpec::Simple,
343 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
344 };
345 let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr.to_owned(), key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
346 let url = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
347 self.hub.send(Request::Rpc(url, correlation_id, dto));
348 }
349 pub fn rpc_with_client(&mut self, addr: &str, key: &str, payload: Value, client_addr: String) {
351 let route = Route {
352 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
353 spec: RouteSpec::Client(Participator::Component(client_addr, self.cfg.app_addr.clone(), self.cfg.client_addr.clone())),
354 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
355 };
356 let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr.to_owned(), key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
357 let host = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
358 self.hub.send(Request::Rpc(host, correlation_id, dto));
359 }
360 pub fn rpc_with_domain(&mut self, addr: &str, key: &str, payload: Value) {
362 match &self.cfg.domain {
363 Some(domain) => {
364 let addr = addr.to_owned() + "." + domain;
365 let route = Route {
366 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
367 spec: RouteSpec::Simple,
368 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
369 };
370 let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr, key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
371 let host = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
372
373 self.hub.send(Request::Rpc(host, correlation_id, dto));
374 }
375 None => panic!(format!("domain is empty on server rpc with domain call {}", self.spec.addr))
376 }
377 }
378 pub fn rpc_with_segment(&mut self, segment: &str, addr: &str, key: &str, payload: Value) {
380 let route = Route {
381 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
382 spec: RouteSpec::Simple,
383 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
384 };
385 let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr.to_owned(), key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
386 let url = self.cfg.host.clone().expect("fetch host is empty on server rpc") + "/" + segment + "/";
387 self.hub.send(Request::Rpc(url, correlation_id, dto));
388 }
389 pub fn send_event_local(&mut self, addr: &str, key: &str, payload: Value) {
391 self.hub.send(Request::Msg(
392 MsgMeta {
393 tx: self.spec.addr.clone(),
394 rx: addr.to_owned(),
395 key: key.to_owned(),
396 kind: MsgKind::Event,
397 correlation_id: Uuid::new_v4(),
398 route: Route {
399 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
400 spec: RouteSpec::Simple,
401 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
402 },
403 payload_size: 0,
404 auth_token: self.cfg.auth_token.clone(),
405 auth_data: self.cfg.auth_data.clone(),
406 attachments: vec![]
407 },
408 payload
409 ));
410 }
411 pub fn send_rpc_local(&mut self, rx: &str, key: &str, payload: Value) {
413 self.hub.send(Request::Msg(
414 MsgMeta {
415 tx: self.spec.addr.clone(),
416 rx: rx.to_owned(),
417 key: key.to_owned(),
418 kind: MsgKind::RpcRequest,
419 correlation_id: Uuid::new_v4(),
420 route: Route {
421 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
422 spec: RouteSpec::Simple,
423 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
424 },
425 payload_size: 0,
426 auth_token: self.cfg.auth_token.clone(),
427 auth_data: self.cfg.auth_data.clone(),
428 attachments: vec![]
429 },
430 payload
431 ));
432 }
433 pub fn proxy_msg_local(&mut self, rx: &str, mut msg_meta: MsgMeta, payload: Value) {
435 msg_meta.route.points.push(Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()));
436
437 self.hub.send(Request::Msg(
438 MsgMeta {
439 tx: self.spec.addr.clone(),
440 rx: rx.to_owned(),
441 key: msg_meta.key,
442 kind: msg_meta.kind,
443 correlation_id: msg_meta.correlation_id,
444 route: msg_meta.route,
445 payload_size: 0,
446 auth_token: self.cfg.auth_token.clone(),
447 auth_data: self.cfg.auth_data.clone(),
448 attachments: vec![]
449 },
450 payload
451 ));
452 }
453 pub fn send_event_tx(&mut self, key: &str, payload: Value) {
455 self.hub.send(Request::Msg(
456 MsgMeta {
457 tx: self.spec.addr.clone(),
458 rx: self.spec.tx.clone(),
459 key: key.to_owned(),
460 kind: MsgKind::Event,
461 correlation_id: Uuid::new_v4(),
462 route: Route {
463 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
464 spec: RouteSpec::Simple,
465 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
466 },
467 payload_size: 0,
468 auth_token: self.cfg.auth_token.clone(),
469 auth_data: self.cfg.auth_data.clone(),
470 attachments: vec![]
471 },
472 payload
473 ));
474 }
475 pub fn send_rpc_tx(&mut self, key: &str, payload: Value) {
477 self.hub.send(Request::Msg(
478 MsgMeta {
479 tx: self.spec.addr.clone(),
480 rx: self.spec.tx.clone(),
481 key: key.to_owned(),
482 kind: MsgKind::RpcRequest,
483 correlation_id: Uuid::new_v4(),
484 route: Route {
485 source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
486 spec: RouteSpec::Simple,
487 points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
488 },
489 payload_size: 0,
490 auth_token: self.cfg.auth_token.clone(),
491 auth_data: self.cfg.auth_data.clone(),
492 attachments: vec![]
493 },
494 payload
495 ));
496 }
497 pub fn proxy_msg_tx(&mut self, mut msg_meta: MsgMeta, payload: Value) {
499 msg_meta.route.points.push(Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()));
500 self.hub.send(Request::Msg(
501 MsgMeta {
502 tx: self.spec.addr.clone(),
503 rx: self.spec.tx.clone(),
504 key: msg_meta.key,
505 kind: msg_meta.kind,
506 correlation_id: msg_meta.correlation_id,
507 route: msg_meta.route,
508 payload_size: 0,
509 auth_token: self.cfg.auth_token.clone(),
510 auth_data: self.cfg.auth_data.clone(),
511 attachments: vec![]
512 },
513 payload
514 ));
515 }
516
517 pub fn rpc_post_string(&mut self, payload: String) {
519 let url = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
520 self.hub.send(Request::PostStringRpc(url, Uuid::new_v4(), payload));
521 }
522
523 pub fn rpc_post_binary(&mut self, payload: Vec<u8>) {
525 let url = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
526 self.hub.send(Request::PostBinaryRpc(url, Uuid::new_v4(), payload));
527 }
528
529 pub fn rpc_post_string_custom_url(&mut self, url: String, payload: String) {
531 self.hub.send(Request::PostStringRpc(url, Uuid::new_v4(), payload));
532 }
533
534 pub fn rpc_post_binary_custom_url(&mut self, url: String, payload: Vec<u8>) {
536 self.hub.send(Request::PostBinaryRpc(url, Uuid::new_v4(), payload));
537 }
538
539 pub fn rpc_get_string_custom_url(&mut self, url: String) {
541 self.hub.send(Request::GetStringRpc(url, Uuid::new_v4()));
542 }
543
544 pub fn rpc_get_binary_custom_url(&mut self, url: String) {
546 self.hub.send(Request::GetBinaryRpc(url, Uuid::new_v4()));
547 }
548}