sp_yew/
hub.rs

1use std::collections::HashMap;
2use anyhow::Error;
3use serde_derive::{Serialize, Deserialize};
4use serde_json::Value;
5use yew::callback::Callback;
6use yew::prelude::worker::*;
7use yew::services::console::ConsoleService;
8use yew::services::fetch::{self, FetchService, FetchTask};
9use yew::agent::HandlerId;
10use yew::format::Nothing;
11use sp_dto::{Participator, MsgKind, uuid::Uuid, MsgMeta, Route, RouteSpec, CmpSpec, rpc_dto_with_correlation_id, get_msg};
12
13pub struct Worker {
14    link: AgentLink<Worker>,
15    clients: HashMap<String, HandlerId>,    
16    fetch_tasks: HashMap<Uuid, FetchTask>
17}
18
19#[derive(Serialize, Deserialize, Debug)]
20pub enum Request {
21    Auth(String),
22    Msg(MsgMeta, Value),
23    Rpc(String, Uuid, Vec<u8>),
24    PostStringRpc(String, Uuid, String),
25    PostBinaryRpc(String, Uuid, Vec<u8>),
26    GetStringRpc(String, Uuid),
27    GetBinaryRpc(String, Uuid)
28}
29
30#[derive(Serialize, Deserialize, Debug)]
31pub enum Response {
32    Msg(MsgMeta, Value),
33    StringRpc(Uuid, String),
34    BinaryRpc(Uuid, Vec<u8>)
35}
36
37pub enum Msg {    
38    RpcReady(Vec<u8>, Uuid, String),
39    StringRpcReady(String, Uuid, String),
40    BinaryRpcReady(Vec<u8>, Uuid, String),
41    FetchError(Error)
42}
43
44impl Worker {
45    pub fn get_cb(&self, correlation_id: Uuid, client_addr: String) -> yew::Callback<fetch::Response<Result<Vec<u8>, Error>>> {
46
47        self.link.callback(move |response: fetch::Response<Result<Vec<u8>, Error>>| {            
48            let (_, data) = response.into_parts();
49            match data {
50                Ok(data) => Msg::RpcReady(data, correlation_id, client_addr.clone()),
51                Err(err) => Msg::FetchError(err)
52            }            
53        })
54    }
55    pub fn get_string_cb(&self, correlation_id: Uuid, client_addr: String) -> yew::Callback<fetch::Response<Result<String, Error>>> {
56
57        self.link.callback(move |response: fetch::Response<Result<String, Error>>| {            
58            let (_, data) = response.into_parts();
59            match data {
60                Ok(data) => Msg::StringRpcReady(data, correlation_id, client_addr.clone()),
61                Err(err) => Msg::FetchError(err)
62            }            
63        })
64    }
65    pub fn get_binary_cb(&self, correlation_id: Uuid, client_addr: String) -> yew::Callback<fetch::Response<Result<Vec<u8>, Error>>> {
66
67        self.link.callback(move |response: fetch::Response<Result<Vec<u8>, Error>>| {
68            let (_, data) = response.into_parts();
69            match data {
70                Ok(data) => Msg::BinaryRpcReady(data, correlation_id, client_addr.clone()),
71                Err(err) => Msg::FetchError(err)
72            }            
73        })
74    }
75}
76
77impl Agent for Worker {
78    // Available:
79    // - `Job` (one per bridge)
80    // - `Context` (shared in the same thread)
81    // - `Public` (separate thread).
82    type Reach = Context<Self>;
83    type Message = Msg;
84    type Input = Request;
85    type Output = Response;
86    // Create an instance with a link to agent's environment.
87    fn create(link: AgentLink<Self>) -> Self {        
88        ConsoleService::log("hub created");
89
90        Worker { 
91            link,
92            clients: HashMap::new(),                        
93            fetch_tasks: HashMap::new()
94        }
95    }
96    // Handle inner messages (of services of `send_back` callbacks)
97    fn update(&mut self, msg: Self::Message) {
98        //self.console.log("hub: got update");
99        match msg {
100            Msg::RpcReady(data, correlation_id, addr) => {
101                let (msg_meta, payload, _) = get_msg::<Value>(&data).expect("failed to get msg on FetchReady");
102                self.fetch_tasks.remove(&correlation_id);
103                match msg_meta.kind {
104                    MsgKind::RpcResponse(_) => {
105                        match msg_meta.route.spec {
106                            RouteSpec::Simple => {
107                                match msg_meta.source_cmp_addr() {
108                                    Some(source_addr) => {
109                                        match source_addr == addr {
110                                            true => {
111                                                match self.clients.get(&addr) {
112                                                    Some(client_id) => self.link.respond(*client_id, Response::Msg(msg_meta, payload)),
113                                                    None => ConsoleService::log(&format!("hub: missing client {}", addr))
114                                                }
115                                            }
116                                            false => {
117                                                ConsoleService::log(&format!("error: message source addr differ from real source, message not delivered, {} vs {} (real one). This is possible security issue, please note.", source_addr, addr));
118                                            }
119                                        }                                        
120                                    }
121                                    None => {
122                                        ConsoleService::log("error: source cmp empty for this rpc");
123                                    }
124                                }
125                            }
126                            RouteSpec::Client(_) => {
127                                match msg_meta.client_cmp_addr() {
128                                    Some(addr) => {
129                                        match self.clients.get(&addr) {
130                                            Some(client_id) => self.link.respond(*client_id, Response::Msg(msg_meta, payload)),
131                                            None => ConsoleService::log(&format!("hub: missing client {}", addr))
132                                        }
133                                    }
134                                    None => {
135                                        ConsoleService::log("error: client cmp empty for this rpc");
136                                    }
137                                }
138                            }
139                        }                                                
140                    }
141                    _ => {}
142                }                
143            }
144            Msg::StringRpcReady(data, correlation_id, addr) => {
145                self.fetch_tasks.remove(&correlation_id);
146                match self.clients.get(&addr) {
147                    Some(client_id) => self.link.respond(*client_id, Response::StringRpc(correlation_id, data)),
148                    None => ConsoleService::log(&format!("hub: missing client {}", addr))
149                }
150            }
151            Msg::BinaryRpcReady(data, correlation_id, addr) => {
152                self.fetch_tasks.remove(&correlation_id);
153                match self.clients.get(&addr) {
154                    Some(client_id) => self.link.respond(*client_id, Response::BinaryRpc(correlation_id, data)),
155                    None => ConsoleService::log(&format!("hub: missing client {}", addr))
156                }
157            }
158            Msg::FetchError(err) => {
159                ConsoleService::log(&format!("error: fetch, {:?}", err));
160            }
161        }
162    }
163    // Handle incoming messages form components of other agents.
164    fn handle_input(&mut self, msg: Self::Input, who: HandlerId) {
165        //self.console.log(&format!("hub: {:?}", msg));        
166        match msg {
167            Request::Auth(addr) => {
168                ConsoleService::log(&format!("hub auth: {}", addr));
169                self.clients.insert(addr, who);
170            }
171            Request::Msg(msg_meta, payload) => {
172                match self.clients.get(&msg_meta.rx) {
173                    Some(client_id) => self.link.respond(*client_id, Response::Msg(msg_meta, payload)),
174                    None => ConsoleService::log(&format!("hub: missing client {}", msg_meta.rx))
175                }
176            }
177            Request::Rpc(url, correlation_id, data) => {
178                match self.clients.iter().find(|(_, x)| **x == who) {
179                    Some((addr, _)) => {
180
181                        let request = fetch::Request::post(url)        
182                            .body(Ok(data))
183                            .expect("Failed to build request.");
184
185                        match FetchService::fetch_binary(request, self.get_cb(correlation_id, addr.clone())) {
186                            Ok(task) => {
187                                let _ = self.fetch_tasks.insert(correlation_id, task);
188                            }
189                            Err(e) => ConsoleService::log(&format!("error: error on fetch, {}", e))
190                        }
191                    }
192                    None => {
193                        ConsoleService::log("error: client not found by handler id")
194                    }
195                }                
196            }
197            Request::PostStringRpc(url, correlation_id, data) => {
198                match self.clients.iter().find(|(_, x)| **x == who) {
199                    Some((addr, _)) => {
200
201                        let request = fetch::Request::post(url)        
202                            .body(Ok(data))
203                            .expect("Failed to build request.");
204                            
205                        match FetchService::fetch(request, self.get_string_cb(correlation_id, addr.clone())) {
206                            Ok(task) => {
207                                let _ = self.fetch_tasks.insert(correlation_id, task);
208                            }
209                            Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
210                        }
211                    }
212                    None => {
213                        ConsoleService::log("error: client not found by handler id")
214                    }
215                }
216            }
217            Request::PostBinaryRpc(url, correlation_id, data) => {
218                match self.clients.iter().find(|(_, x)| **x == who) {
219                    Some((addr, _)) => {
220
221                        let request = fetch::Request::post(url)        
222                            .body(Ok(data))
223                            .expect("Failed to build request.");
224
225                        match FetchService::fetch_binary(request, self.get_binary_cb(correlation_id, addr.clone())) {
226                            Ok(task) => {
227                                let _ = self.fetch_tasks.insert(correlation_id, task);
228                            }
229                            Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
230                        }
231                    }
232                    None => {
233                        ConsoleService::log("error: client not found by handler id")
234                    }
235                }
236            }
237            Request::GetStringRpc(url, correlation_id) => {
238                match self.clients.iter().find(|(_, x)| **x == who) {
239                    Some((addr, _)) => {
240
241                        let request = fetch::Request::get(url)
242                            .body(Nothing)
243                            .expect("Failed to build request.");
244                            
245                        match FetchService::fetch(request, self.get_string_cb(correlation_id, addr.clone())) {
246                            Ok(task) => {
247                                let _ = self.fetch_tasks.insert(correlation_id, task);
248                            }
249                            Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
250                        }
251                    }
252                    None => {
253                        ConsoleService::log("error: client not found by handler id")
254                    }
255                }
256            }
257            Request::GetBinaryRpc(url, correlation_id) => {
258                match self.clients.iter().find(|(_, x)| **x == who) {
259                    Some((addr, _)) => {
260
261                        let request = fetch::Request::post(url)        
262                            .body(Nothing)
263                            .expect("Failed to build request.");
264
265                        match FetchService::fetch_binary(request, self.get_binary_cb(correlation_id, addr.clone())) {
266                            Ok(task) => {
267                                let _ = self.fetch_tasks.insert(correlation_id, task);
268                            }
269                            Err(e) => ConsoleService::log(&format!("error: error on fetch, simple string rpc, {}", e))
270                        }
271                    }
272                    None => {
273                        ConsoleService::log("error: client not found by handler id")
274                    }
275                }
276            }
277        }
278    }
279}
280
281/// This struct is attached to components and used for communication between components and also a sever.
282pub struct Hub {
283    hub: Box<dyn Bridge<Worker>>,
284    pub spec: CmpSpec,
285    pub cfg: HubCfg    
286}
287
288/// Configuration for various communication scenarios.
289#[derive(Clone, PartialEq)]
290pub struct HubCfg {
291    pub app_addr: Option<String>,
292    pub client_addr: Option<String>,
293    pub host: Option<String>,
294    pub fetch_url: Option<String>,
295    pub ws_url: Option<String>,
296    pub domain: Option<String>,
297    pub auth_token: Option<String>,
298    pub auth_data: Option<Value>
299}
300
301impl Default for HubCfg {
302    fn default() -> Self {
303        HubCfg {
304            app_addr: None,
305            client_addr: None,
306            host: None,
307            fetch_url: None,
308            ws_url: None,
309            domain: None,
310            auth_token: None,
311            auth_data: None
312        }        
313    }
314}
315
316impl Hub {
317    pub fn new(spec: CmpSpec, cfg: HubCfg, callback: Callback<Response>) -> Hub {
318        let mut hub = Worker::bridge(callback);
319        hub.send(Request::Auth(spec.addr.clone()));
320        Hub {
321            hub,
322            spec,
323            cfg            
324        }        
325    }
326    pub fn new_no_auth(spec: CmpSpec, cfg: HubCfg, callback: Callback<Response>) -> Hub {
327        Hub {
328            hub: Worker::bridge(callback),
329            spec,
330            cfg            
331        }        
332    }
333    pub fn auth(&mut self, spec: CmpSpec, cfg: HubCfg) {
334        self.spec = spec;
335        self.cfg = cfg;
336        self.hub.send(Request::Auth(self.spec.addr.clone()));
337    }
338    /// Sends rpc request to the server
339    pub fn rpc(&mut self, addr: &str, key: &str, payload: Value) {
340        let route = Route {
341            source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
342            spec: RouteSpec::Simple,
343            points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
344        };
345        let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr.to_owned(), key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
346        let url = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
347        self.hub.send(Request::Rpc(url, correlation_id, dto));
348    }
349    /// Sends rpc request to the server, but result will forwarded to component with client_addr
350    pub fn rpc_with_client(&mut self, addr: &str, key: &str, payload: Value, client_addr: String) {
351        let route = Route {
352            source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
353            spec: RouteSpec::Client(Participator::Component(client_addr, self.cfg.app_addr.clone(), self.cfg.client_addr.clone())),
354            points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
355        };
356        let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr.to_owned(), key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
357        let host = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
358        self.hub.send(Request::Rpc(host, correlation_id, dto));
359    }
360    /// Sends rpc request to the server, iserting domain value form config in to target addr.
361    pub fn rpc_with_domain(&mut self, addr: &str, key: &str, payload: Value) {
362        match &self.cfg.domain {
363            Some(domain) => {
364                let addr = addr.to_owned() + "." + domain;
365                let route = Route {
366                    source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
367                    spec: RouteSpec::Simple,
368                    points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
369                };
370                let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr, key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
371                let host = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
372
373                self.hub.send(Request::Rpc(host, correlation_id, dto));
374            }
375            None => panic!(format!("domain is empty on server rpc with domain call {}", self.spec.addr))
376        }        
377    }
378    /// Sends rpc request to the server, iserting url segment to the resulting url.
379    pub fn rpc_with_segment(&mut self, segment: &str, addr: &str, key: &str, payload: Value) {
380        let route = Route {
381            source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
382            spec: RouteSpec::Simple,
383            points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
384        };
385        let (correlation_id, dto) = rpc_dto_with_correlation_id(self.spec.addr.clone(), addr.to_owned(), key.to_owned(), payload, route, self.cfg.auth_token.clone(), self.cfg.auth_data.clone()).expect("failed to create rpc dto with correlation id on server rpc");
386        let url = self.cfg.host.clone().expect("fetch host is empty on server rpc") + "/" + segment + "/";
387        self.hub.send(Request::Rpc(url, correlation_id, dto));
388    }
389    /// Sends message marked as event to other component.
390    pub fn send_event_local(&mut self, addr: &str, key: &str, payload: Value) {
391        self.hub.send(Request::Msg(
392            MsgMeta {
393                tx: self.spec.addr.clone(),
394                rx: addr.to_owned(),
395                key: key.to_owned(),
396                kind: MsgKind::Event,
397                correlation_id: Uuid::new_v4(),
398                route: Route {
399                    source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
400                    spec: RouteSpec::Simple,
401                    points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
402                },
403                payload_size: 0,
404                auth_token: self.cfg.auth_token.clone(),
405                auth_data: self.cfg.auth_data.clone(),
406                attachments: vec![]
407            }, 
408            payload
409        ));
410    }
411    /// Sends message marked as rpc request to other component.
412    pub fn send_rpc_local(&mut self, rx: &str, key: &str, payload: Value) {
413        self.hub.send(Request::Msg(
414            MsgMeta {
415                tx: self.spec.addr.clone(),
416                rx: rx.to_owned(),
417                key: key.to_owned(),
418                kind: MsgKind::RpcRequest,
419                correlation_id: Uuid::new_v4(),
420                route: Route {
421                    source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
422                    spec: RouteSpec::Simple,
423                    points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
424                },
425                payload_size: 0,
426                auth_token: self.cfg.auth_token.clone(),
427                auth_data: self.cfg.auth_data.clone(),
428                attachments: vec![]
429            },
430            payload
431        ));
432    }
433    /// Forwards current message to other component.
434    pub fn proxy_msg_local(&mut self, rx: &str, mut msg_meta: MsgMeta, payload: Value) {
435        msg_meta.route.points.push(Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()));
436
437        self.hub.send(Request::Msg(
438            MsgMeta {
439                tx: self.spec.addr.clone(),
440                rx: rx.to_owned(),
441                key: msg_meta.key,
442                kind: msg_meta.kind,
443                correlation_id: msg_meta.correlation_id,
444                route: msg_meta.route,
445                payload_size: 0,
446                auth_token: self.cfg.auth_token.clone(),
447                auth_data: self.cfg.auth_data.clone(),
448                attachments: vec![]
449            },
450            payload
451        ));
452    }
453    /// Sends message marked as event to component with addr held inside tx variable of component spec (which is stored in a hub struct).
454    pub fn send_event_tx(&mut self, key: &str, payload: Value) {
455        self.hub.send(Request::Msg(
456            MsgMeta {
457                tx: self.spec.addr.clone(),
458                rx: self.spec.tx.clone(),
459                key: key.to_owned(),
460                kind: MsgKind::Event,
461                correlation_id: Uuid::new_v4(),
462                route: Route {
463                    source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
464                    spec: RouteSpec::Simple,
465                    points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
466                },
467                payload_size: 0,
468                auth_token: self.cfg.auth_token.clone(),
469                auth_data: self.cfg.auth_data.clone(),
470                attachments: vec![]
471            }, 
472            payload
473        ));
474    }
475    /// Sends message marked as rpc request to component with addr held inside tx variable of component spec (which is stored in a hub struct).
476    pub fn send_rpc_tx(&mut self, key: &str, payload: Value) {
477        self.hub.send(Request::Msg(
478            MsgMeta {
479                tx: self.spec.addr.clone(),
480                rx: self.spec.tx.clone(),
481                key: key.to_owned(),
482                kind: MsgKind::RpcRequest,
483                correlation_id: Uuid::new_v4(),
484                route: Route {
485                    source: Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()),
486                    spec: RouteSpec::Simple,
487                    points: vec![Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone())]
488                },
489                payload_size: 0,
490                auth_token: self.cfg.auth_token.clone(),
491                auth_data: self.cfg.auth_data.clone(),
492                attachments: vec![]
493            },
494            payload
495        ));
496    }
497    /// Forwards current message to component with addr held inside tx variable of component spec (which is stored in a hub struct).
498    pub fn proxy_msg_tx(&mut self, mut msg_meta: MsgMeta, payload: Value) {
499        msg_meta.route.points.push(Participator::Component(self.spec.addr.clone(), self.cfg.app_addr.clone(), self.cfg.client_addr.clone()));
500        self.hub.send(Request::Msg(
501            MsgMeta {
502                tx: self.spec.addr.clone(),
503                rx: self.spec.tx.clone(),
504                key: msg_meta.key,
505                kind: msg_meta.kind,
506                correlation_id: msg_meta.correlation_id,
507                route: msg_meta.route,
508                payload_size: 0,
509                auth_token: self.cfg.auth_token.clone(),
510                auth_data: self.cfg.auth_data.clone(),
511                attachments: vec![]
512            },
513            payload
514        ));
515    }
516
517    /// Function for making request without using protocol described in sp-dto crate. Simple wrapper around fetch API.
518    pub fn rpc_post_string(&mut self, payload: String) {
519        let url = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
520        self.hub.send(Request::PostStringRpc(url, Uuid::new_v4(), payload));
521    }
522
523    /// Function for making request without using protocol described in sp-dto crate. Simple wrapper around fetch API.
524    pub fn rpc_post_binary(&mut self, payload: Vec<u8>) {
525        let url = self.cfg.fetch_url.clone().expect("fetch host is empty on server rpc");
526        self.hub.send(Request::PostBinaryRpc(url, Uuid::new_v4(), payload));
527    }
528
529    /// Function for making request without using protocol described in sp-dto crate. Simple wrapper around fetch API.
530    pub fn rpc_post_string_custom_url(&mut self, url: String, payload: String) {        
531        self.hub.send(Request::PostStringRpc(url, Uuid::new_v4(), payload));
532    }
533
534    /// Function for making request without using protocol described in sp-dto crate. Simple wrapper around fetch API.
535    pub fn rpc_post_binary_custom_url(&mut self, url: String, payload: Vec<u8>) {        
536        self.hub.send(Request::PostBinaryRpc(url, Uuid::new_v4(), payload));
537    }
538
539    /// Function for making request without using protocol described in sp-dto crate. Simple wrapper around fetch API.
540    pub fn rpc_get_string_custom_url(&mut self, url: String) {
541        self.hub.send(Request::GetStringRpc(url, Uuid::new_v4()));
542    }
543
544    /// Function for making request without using protocol described in sp-dto crate. Simple wrapper around fetch API.
545    pub fn rpc_get_binary_custom_url(&mut self, url: String) {        
546        self.hub.send(Request::GetBinaryRpc(url, Uuid::new_v4()));
547    }
548}