vertx_rust/vertx/
mod.rs

1pub mod cm;
2pub mod message;
3
4use crate::http::HttpServer;
5use crate::net;
6use crate::net::NetServer;
7use crate::vertx::cm::{ClusterManager, ClusterNodeInfo, ServerID};
8use crate::vertx::message::{Message, Body, MessageInner};
9use core::fmt::Debug;
10use hashbrown::HashMap;
11use log::{debug, error, info, trace, warn};
12use signal_hook::iterator::Signals;
13use std::sync::atomic::{AtomicBool, AtomicI16, Ordering};
14use std::{
15    sync::{Arc},
16};
17use std::future::Future;
18use tokio::net::TcpStream;
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use std::marker::PhantomData;
21use std::panic::RefUnwindSafe;
22use std::pin::Pin;
23use std::process::exit;
24use atomic_refcell::AtomicRefCell;
25use crossbeam_channel::{bounded, Receiver, Sender};
26use dashmap::DashMap;
27// use futures::future::BoxFuture;
28use parking_lot::Mutex;
29use tokio::task::JoinHandle;
30#[cfg(feature = "catch_unwind")]
31use futures::FutureExt;
32
33// type BoxFnMessage<CM> = Box<dyn Fn(&mut Message, Arc<EventBus<CM>>) + Send + Sync>;
34// type PinBoxFnMessage<CM> = Box<dyn Fn(&Message, Arc<EventBus<CM>>) + Send + Sync>;
35pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
36type PinBoxFnMessage<CM> = Pin<Box<dyn Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe>>;
37
38
39pub fn wrap_with_catch_unwind<CM, F>(func: F) -> PinBoxFnMessage<CM>
40    where
41        CM: ClusterManager + Send + Sync + 'static + RefUnwindSafe,
42        F: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
43{
44    let wrapped_func = move |msg: Arc<Message>, bus: Arc<EventBus<CM>>| {
45        let result = std::panic::catch_unwind(|| func(msg.clone(), bus.clone()));
46        match result {
47            Ok(future) => future,
48            Err(err) => Box::pin(async move {
49                if let Some (err_msg) = err.downcast_ref::<&str>() {
50                    error!("{:?} in function: {:?}", err_msg, std::any::type_name::<F>());
51                    msg.reply(Body::Panic(format!("{:?} in function: {:?}", err_msg, std::any::type_name::<F>())));
52                } else {
53                    msg.reply(Body::Panic("Unknown panic!".to_string()));
54                }
55            }),
56        }
57    };
58
59    Box::pin(wrapped_func)
60}
61
62lazy_static! {
63    static ref TCPS: Arc<HashMap<String, Arc<TcpStream>>> = Arc::new(HashMap::new());
64    static ref DO_INVOKE: AtomicBool = AtomicBool::new(true);
65}
66
67//Vertx options
68#[derive(Debug, Clone)]
69pub struct VertxOptions {
70    //Worker pool size, default number of cpu/2
71    worker_pool_size: usize,
72    //Event bus options
73    event_bus_options: EventBusOptions,
74}
75
76impl VertxOptions {
77    pub fn worker_pool_size(&mut self, size: usize) -> &mut Self {
78        self.worker_pool_size = size;
79        self
80    }
81
82    pub fn event_bus_options(&mut self) -> &mut EventBusOptions {
83        &mut self.event_bus_options
84    }
85}
86
87impl Default for VertxOptions {
88    fn default() -> Self {
89        let cpus = num_cpus::get();
90        let vertx_port: u16 = 0;
91        let vertx_host = "127.0.0.1".to_owned();
92        VertxOptions {
93            worker_pool_size: cpus / 2,
94            event_bus_options: EventBusOptions::from((vertx_host, vertx_port)),
95        }
96    }
97}
98
99//Event bus options
100#[derive(Debug, Clone)]
101pub struct EventBusOptions {
102    //Event bus pool size, default number of cpu
103    event_bus_pool_size: usize,
104    //Event bus queue size, default is 2000
105    event_bus_queue_size: usize,
106    //Event bus host, default 127.0.0.1
107    vertx_host: String,
108    //Event bus port, default 0
109    vertx_port: u16,
110}
111
112impl EventBusOptions {
113    pub fn event_bus_pool_size(&mut self, size: usize) -> &mut Self {
114        self.event_bus_pool_size = size;
115        self
116    }
117
118    pub fn host(&mut self, host: String) -> &mut Self {
119        self.vertx_host = host;
120        self
121    }
122
123    pub fn port(&mut self, port: u16) -> &mut Self {
124        self.vertx_port = port;
125        self
126    }
127
128    pub fn event_bus_queue_size(&mut self, size: usize) -> &mut Self {
129        self.event_bus_queue_size = size;
130        self
131    }
132}
133
134impl From<(String, u16)> for EventBusOptions {
135    fn from(opts: (String, u16)) -> Self {
136        let cpus = num_cpus::get();
137        EventBusOptions {
138            event_bus_pool_size: cpus,
139            vertx_host: opts.0,
140            vertx_port: opts.1,
141            event_bus_queue_size: 2000,
142        }
143    }
144}
145
146impl Default for EventBusOptions {
147    fn default() -> Self {
148        let cpus = num_cpus::get();
149        let vertx_port: u16 = 0;
150        EventBusOptions {
151            event_bus_pool_size: cpus / 2,
152            vertx_host: String::from("127.0.0.1"),
153            vertx_port,
154            event_bus_queue_size: 2000,
155        }
156    }
157}
158
159pub struct Vertx<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> {
160    #[allow(dead_code)]
161    options: VertxOptions,
162    event_bus: Arc<EventBus<CM>>,
163    ph: PhantomData<CM>,
164}
165
166impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> Vertx<CM> {
167    pub fn new(options: VertxOptions) -> Vertx<CM> {
168        let event_bus = EventBus::<CM>::new(options.event_bus_options.clone());
169        Vertx {
170            options,
171            event_bus: Arc::new(event_bus),
172            ph: PhantomData,
173        }
174    }
175
176    pub fn set_cluster_manager(&mut self, cm: CM) {
177        debug!("set_cluster_manager: {:?}", cm.get_nodes());
178        Arc::get_mut(&mut self.event_bus)
179            .unwrap()
180            .set_cluster_manager(cm);
181    }
182
183    pub async fn start(&self) {
184        info!("start vertx version {}", env!("CARGO_PKG_VERSION"));
185        let mut signals = Signals::new(&[2]).unwrap();
186        let event_bus = self.event_bus.clone();
187        tokio::spawn(async move {
188            let sig = signals.forever().next();
189            info!("Stopping vertx with signal: {:?}", sig.unwrap());
190            drop(event_bus.sender.data_ptr());
191            exit(sig.unwrap());
192        });
193        self.event_bus.start().await;
194    }
195
196    pub async fn create_http_server(&self) -> HttpServer<CM> {
197        let _ = self.event_bus().await;
198        HttpServer::new(Some(self.event_bus.clone()))
199    }
200
201    pub async fn create_net_server(&self) -> &'static mut NetServer<CM> {
202        let _ = self.event_bus().await;
203        NetServer::new(Some(self.event_bus.clone()))
204    }
205
206    pub async fn event_bus(&self) -> Arc<EventBus<CM>> {
207        if !self.event_bus.init {
208            let mut ev = self.event_bus.clone();
209            unsafe {
210                let opt_ev = Arc::get_mut_unchecked(&mut ev);
211                opt_ev.init().await;
212                opt_ev.init = true;
213            }
214        }
215        self.event_bus.clone()
216    }
217
218    pub async fn stop(&self) {
219        self.event_bus.stop().await;
220    }
221}
222
223pub struct EventBus<CM: 'static + ClusterManager + Send + Sync+ RefUnwindSafe> {
224    options: EventBusOptions,
225    consumers: Arc<HashMap<String, PinBoxFnMessage<CM>>>,
226    consumers_async: Arc<HashMap<String, PinBoxFnMessage<CM>>>,
227    callback_functions:
228        Arc<DashMap<String, PinBoxFnMessage<CM>>>,
229    pub(crate) sender: Mutex<Sender<Arc<Message>>>,
230    receiver_joiner: Arc<JoinHandle<()>>,
231    cluster_manager: Arc<Option<CM>>,
232    event_bus_port: u16,
233    self_arc: Option<Arc<EventBus<CM>>>,
234    init: bool,
235}
236
237impl <CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> RefUnwindSafe for EventBus<CM> {}
238
239impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> EventBus<CM> {
240    pub fn new(options: EventBusOptions) -> EventBus<CM> {
241        let (sender, _): (Sender<Arc<Message>>, Receiver<Arc<Message>>) = bounded(1);
242        let receiver_joiner = tokio::spawn(async {});
243        let ev = EventBus {
244            options,
245            consumers: Arc::new(HashMap::new()),
246            consumers_async: Arc::new(HashMap::new()),
247            callback_functions: Arc::new(DashMap::new()),
248            sender: Mutex::new(sender),
249            receiver_joiner: Arc::new(receiver_joiner),
250            cluster_manager: Arc::new(None),
251            event_bus_port: 0,
252            self_arc: None,
253            init: false,
254        };
255        ev
256    }
257
258    fn set_cluster_manager(&mut self, cm: CM) {
259        let mut m = cm;
260        m.join();
261        self.cluster_manager = Arc::new(Some(m));
262    }
263
264    async fn start(&self) {
265        let joiner = &self.receiver_joiner;
266        let h = joiner.clone();
267        unsafe {
268            let val: JoinHandle<()> = std::ptr::read(&*h);
269            val.await.unwrap();
270        }
271    }
272
273    async fn stop(&self) {
274        info!("stopping event_bus");
275        DO_INVOKE.store(false, Ordering::Relaxed);
276        self.send("stop", Body::String("stop".to_string()));
277    }
278
279    async fn init(&mut self) {
280        let (sender, receiver): (Sender<Arc<Message>>, Receiver<Arc<Message>>) = bounded(self.options.event_bus_queue_size);
281        self.sender = Mutex::new(sender);
282        let local_consumers = self.consumers.clone();
283        let local_cf = self.callback_functions.clone();
284        let local_sender = self.sender.lock().clone();
285        self.self_arc = Some(unsafe { Arc::from_raw(self) });
286
287        let net_server = self.create_net_server().await;
288        self.event_bus_port = net_server.port;
289        self.registry_in_cm();
290        info!(
291            "start event_bus on tcp://{}:{}",
292            self.options.vertx_host, self.event_bus_port
293        );
294
295        self.prepare_consumer_msg(receiver, local_consumers, local_cf, local_sender).await;
296    }
297
298    async fn prepare_consumer_msg(
299        &mut self,
300        receiver: Receiver<Arc<Message>>,
301        local_consumers: Arc<
302            HashMap<String, PinBoxFnMessage<CM>>,
303        >,
304        local_cf: Arc<
305            DashMap<String, PinBoxFnMessage<CM>>,
306        >,
307        local_sender: Sender<Arc<Message>>,
308    ) {
309        let local_cm = self.cluster_manager.clone();
310        let local_ev = self.self_arc.clone();
311        let local_local_consumers = self.consumers_async.clone();
312
313        let joiner = tokio::spawn(async move {
314            loop {
315                if !DO_INVOKE.load(Ordering::Relaxed) {
316                    return;
317                }
318                match receiver.recv() {
319                    Ok(msg) => {
320                        trace!("{:?} - {:?}", msg.invoke_count, msg.inner.borrow());
321                        let inner_consummers = local_consumers.clone();
322                        let inner_cf = local_cf.clone();
323                        let inner_sender = local_sender.clone();
324                        let inner_cm = local_cm.clone();
325                        let inner_ev = local_ev.clone();
326                        let inner_local_consumers = local_local_consumers.clone();
327
328                        // tokio::spawn(async move {
329                            let mut_msg = msg;
330                            match mut_msg.address() {
331                                Some(address) => {
332                                    if inner_local_consumers.contains_key(&address) {
333                                        <EventBus<CM>>::call_local_async(
334                                            &inner_local_consumers,
335                                            &inner_sender,
336                                            mut_msg,
337                                            &address,
338                                            inner_ev.clone().unwrap(),
339                                            inner_cf,
340                                        ).await;
341                                        // return;
342                                    } else {
343                                        // invoke function from consumer
344                                        let mut inner_cm0 = inner_cm.clone();
345                                        let manager = unsafe { Arc::get_mut_unchecked(&mut inner_cm0) };
346                                        match manager {
347                                            // ClusterManager
348                                            Some(cm) => {
349                                                debug!(
350                                                "manager: {:?}",
351                                                cm.get_subs().read().unwrap().len()
352                                            );
353                                                let nodes_lock = {
354                                                    let subs = cm.get_subs();
355                                                    let nodes = subs.read().unwrap();
356                                                    match nodes.get_vec(&address) {
357                                                        None => None,
358                                                        Some(n) => {
359                                                            Some(n.to_vec())
360                                                        }
361                                                    }
362                                                };
363
364                                                match nodes_lock {
365                                                    Some(n) => {
366                                                        if n.is_empty() {
367                                                            warn!("subs not found");
368                                                        } else {
369                                                            <EventBus<CM>>::send_message(
370                                                                &inner_consummers,
371                                                                &inner_sender,
372                                                                &inner_ev,
373                                                                mut_msg,
374                                                                &address,
375                                                                cm,
376                                                                &n,
377                                                                inner_cf,
378                                                            ).await;
379                                                        }
380                                                    }
381                                                    None => {
382                                                        let callback = {
383                                                            inner_cf.remove(&address)
384                                                        };
385
386                                                        match callback {
387                                                            Some(caller) => {
388                                                                #[cfg(not(feature = "catch_unwind"))]
389                                                                caller.1.call((mut_msg.clone(), inner_ev.clone().unwrap())).await;
390                                                                #[cfg(feature = "catch_unwind")]
391                                                                {
392                                                                    let handler = tokio::spawn(caller.1.call((mut_msg.clone(), inner_ev.clone().unwrap())));
393                                                                    match handler.catch_unwind().await {
394                                                                        Ok(ok) => match ok {
395                                                                            Ok(_) => {},
396                                                                            Err(err) => {
397                                                                                error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
398                                                                                mut_msg.reply(Body::Panic(err.to_string()));
399                                                                            }
400                                                                        },
401                                                                        Err(err) => {
402                                                                            if let Some (err_msg) = err.downcast_ref::<&str>() {
403                                                                                error!("{:?}", err_msg);
404                                                                                mut_msg.reply(Body::Panic(err_msg.to_string()));
405                                                                            } else {
406                                                                                mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
407                                                                            }
408
409                                                                        }
410                                                                    }
411                                                                }
412                                                            }
413                                                            None => {
414                                                                let host = mut_msg.inner.borrow().host.clone();
415                                                                let port = mut_msg.inner.borrow().port;
416                                                                <EventBus<CM>>::send_reply(mut_msg, host, port).await;
417                                                            }
418                                                        }
419                                                    }
420                                                }
421                                            }
422                                            None => {
423                                                // NoClusterManager
424                                                <EventBus<CM>>::call_local_func(
425                                                    &inner_consummers,
426                                                    &inner_sender,
427                                                    mut_msg,
428                                                    &address,
429                                                    inner_ev.clone().unwrap(),
430                                                    inner_cf,
431                                                ).await;
432                                            }
433                                        }
434                                    }
435
436                                }
437                                None => <EventBus<CM>>::call_replay(
438                                    inner_cf,
439                                    mut_msg,
440                                    inner_ev.clone().unwrap(),
441                                ).await,
442                            }
443                        // }).await;
444                    }
445                    Err(e) => {
446                        error!("Error: {:?}", e);
447                    }
448                }
449            }
450        });
451        self.receiver_joiner = Arc::new(joiner);
452    }
453
454    #[inline]
455    async fn send_message(
456        inner_consummers: &Arc<
457            HashMap<String, PinBoxFnMessage<CM>>,
458        >,
459        inner_sender: &Sender<Arc<Message>>,
460        inner_ev: &Option<Arc<EventBus<CM>>>,
461        mut_msg: Arc<Message>,
462        address: &str,
463        cm: &mut CM,
464        nodes: &[ClusterNodeInfo],
465        inner_cf: Arc<
466            DashMap<String, PinBoxFnMessage<CM>>,
467        >,
468    ) {
469        if mut_msg.inner.borrow().publish {
470            for node in nodes {
471                let mut node = Some(node);
472                <EventBus<CM>>::send_to_node(
473                    &inner_consummers,
474                    &inner_sender,
475                    inner_ev,
476                    mut_msg.clone(),
477                    &address,
478                    cm,
479                    inner_cf.clone(),
480                    &mut node,
481                ).await
482            }
483        } else {
484            let idx = cm.next(nodes.len());
485            let mut node = nodes.get(idx);
486            match node {
487                Some(_) => {}
488                None => {
489                    let idx = cm.next(nodes.len());
490                    node = nodes.get(idx);
491                }
492            }
493            <EventBus<CM>>::send_to_node(
494                &inner_consummers,
495                &inner_sender,
496                inner_ev,
497                mut_msg,
498                &address,
499                cm,
500                inner_cf,
501                &mut node,
502            ).await
503        }
504    }
505
506    #[inline]
507    async fn send_to_node(
508        inner_consummers: &&Arc<
509            HashMap<String, PinBoxFnMessage<CM>>,
510        >,
511        inner_sender: &&Sender<Arc<Message>>,
512        inner_ev: &Option<Arc<EventBus<CM>>>,
513        mut_msg: Arc<Message>,
514        address: &&str,
515        cm: &mut CM,
516        inner_cf: Arc<
517            DashMap<String, PinBoxFnMessage<CM>>,
518        >,
519        node: &mut Option<&ClusterNodeInfo>,
520    ) {
521        let node = node.unwrap();
522        let host = node.serverID.host.clone();
523        let port = node.serverID.port;
524
525        if node.nodeId == cm.get_node_id() {
526            <EventBus<CM>>::call_local_func(
527                &inner_consummers,
528                &inner_sender,
529                mut_msg,
530                &address,
531                inner_ev.clone().unwrap(),
532                inner_cf,
533            ).await
534        } else {
535            debug!("{:?}", node);
536            let node_id = node.nodeId.clone();
537            // let mut message: &'static mut Message = Box::leak(Box::from(mut_msg.clone()));
538            let message = mut_msg.clone();
539            // tokio::spawn(async move {
540                let tcp_stream = TCPS.get(&node_id);
541                match tcp_stream {
542                    Some(stream) => <EventBus<CM>>::get_stream(message, stream).await,
543
544                    None => {
545                        <EventBus<CM>>::create_stream(message, host, port, node_id).await;
546                    }
547                }
548            // });
549        }
550    }
551
552    #[inline]
553    async fn call_replay(
554        inner_cf: Arc<
555            DashMap<String, PinBoxFnMessage<CM>>,
556        >,
557        mut_msg: Arc<Message>,
558        ev: Arc<EventBus<CM>>,
559    ) {
560        let address = mut_msg.replay();
561        if let Some(address) = address {
562            // let mut map = inner_cf.lock();
563            let callback = inner_cf.remove(&address);
564            if let Some(caller) = callback {
565
566                #[cfg(not(feature = "catch_unwind"))]
567                caller.1(mut_msg.clone(), ev).await;
568                #[cfg(feature = "catch_unwind")]
569                {
570                    let handler = tokio::spawn(caller.1(mut_msg.clone(), ev));
571                    match handler.catch_unwind().await {
572                        Ok(ok) => match ok {
573                            Ok(_) => {},
574                            Err(err) => {
575                                error!("{:?}", err.to_string());
576                                mut_msg.reply(Body::Panic(err.to_string()));
577                            }
578                        },
579                        Err(err) => {
580                            if let Some (err_msg) = err.downcast_ref::<&str>() {
581                                error!("{:?}", err_msg);
582                                mut_msg.reply(Body::Panic(err_msg.to_string()));
583                            } else {
584                                mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
585                            }
586
587                        }
588                    }
589                }
590            }
591        }
592    }
593
594    #[inline]
595    async fn call_local_func(
596        inner_consummers: &Arc<
597            HashMap<String, PinBoxFnMessage<CM>>,
598        >,
599        inner_sender: &Sender<Arc<Message>>,
600        mut_msg: Arc<Message>,
601        address: &str,
602        ev: Arc<EventBus<CM>>,
603        inner_cf: Arc<
604            DashMap<String, PinBoxFnMessage<CM>>,
605        >,
606    ) {
607        let callback = inner_consummers.get(&address.to_string());
608        match callback {
609            Some(caller) => {
610                if !mut_msg.invoke.load(Ordering::SeqCst) {
611                    #[cfg(not(feature = "catch_unwind"))]
612                    caller(mut_msg.clone(), ev).await;
613                    #[cfg(feature = "catch_unwind")]
614                    {
615                        let handler = tokio::spawn(caller(mut_msg.clone(), ev));
616                        match handler.catch_unwind().await {
617                            Ok(ok) => match ok {
618                                Ok(_) => {},
619                                Err(err) => {
620                                    error!("{:?}", err.to_string());
621                                    mut_msg.reply(Body::Panic(err.to_string()));
622                                }
623                            },
624                            Err(err) => {
625                                if let Some (err_msg) = err.downcast_ref::<&str>() {
626                                    error!("{:?}", err_msg);
627                                    mut_msg.reply(Body::Panic(err_msg.to_string()));
628                                } else {
629                                    mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
630                                }
631
632                            }
633                        }
634                    }
635                    mut_msg.invoke.store(true, Ordering::SeqCst);
636                }
637                if mut_msg.inner.borrow().address.is_some() {
638                    if mut_msg.invoke_count.load(Ordering::SeqCst) == 5 {
639                        mut_msg.reply(Body::Panic("Message got stuck in a loop probably due to an error in the reply to the sub-message. Message deleted!".to_string()));
640                        inner_sender.send(mut_msg.clone()).unwrap();
641                    } else if mut_msg.invoke_count.load(Ordering::SeqCst) < 5  {
642                        inner_sender.send(mut_msg.clone()).unwrap();
643                    }
644                }
645            }
646            None => {
647                let callback = inner_cf.remove(address);
648                if let Some(caller) = callback {
649                    let msg = mut_msg.clone();
650                    #[cfg(not(feature = "catch_unwind"))]
651                    caller.1.call((msg, ev)).await;
652                    #[cfg(feature = "catch_unwind")]
653                    {
654                        let handler = tokio::spawn(caller.1.call((msg, ev)));
655                        match handler.catch_unwind().await {
656                            Ok(ok) => match ok {
657                                Ok(_) => {},
658                                Err(err) => {
659                                    error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
660                                    mut_msg.reply(Body::Panic(err.to_string()));
661                                }
662                            },
663                            Err(err) => {
664                                if let Some (err_msg) = err.downcast_ref::<&str>() {
665                                    error!("{:?}", err_msg);
666                                    mut_msg.reply(Body::Panic(err_msg.to_string()));
667                                } else {
668                                    mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
669                                }
670
671                            }
672                        }
673                    }
674                } else { // wysłanie odpowiedzi do requesta
675                    let address = mut_msg.inner.borrow().replay.clone();
676                    if let Some(address) = address {
677                        let callback = inner_cf.remove(&address);
678                        if let Some(caller) = callback {
679                            #[cfg(not(feature = "catch_unwind"))]
680                            caller.1.call((mut_msg.clone(), ev)).await;
681                            #[cfg(feature = "catch_unwind")]
682                            {
683                                let handler = tokio::spawn(caller.1.call((mut_msg.clone(), ev)));
684                                match handler.catch_unwind().await {
685                                    Ok(ok) => match ok {
686                                        Ok(_) => {},
687                                        Err(err) => {
688                                            error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
689                                            mut_msg.reply(Body::Panic(err.to_string()));
690                                        }
691                                    },
692                                    Err(err) => {
693                                        if let Some (err_msg) = err.downcast_ref::<&str>() {
694                                            error!("{:?}", err_msg);
695                                            mut_msg.reply(Body::Panic(err_msg.to_string()));
696                                        } else {
697                                            mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
698                                        }
699
700                                    }
701                                }
702                            }
703                        }
704                    }
705                }
706            }
707        }
708    }
709
710    #[inline]
711    async fn call_local_async(
712        inner_consummers: &Arc<
713            HashMap<String, PinBoxFnMessage<CM>>,
714        >,
715        inner_sender: &Sender<Arc<Message>>,
716        mut_msg: Arc<Message>,
717        address: &str,
718        ev: Arc<EventBus<CM>>,
719        inner_cf: Arc<
720            DashMap<String, PinBoxFnMessage<CM>>,
721        >,
722    ) {
723        let callback = inner_consummers.get(&address.to_string());
724        match callback {
725            Some(caller) => {
726                mut_msg.invoke_count.fetch_add(1, Ordering::SeqCst);
727                if !mut_msg.invoke.load(Ordering::SeqCst) {
728                    #[cfg(not(feature = "catch_unwind"))]
729                    caller.call((mut_msg.clone(), ev)).await;
730                    #[cfg(feature = "catch_unwind")]
731                    {
732                        let handler = tokio::spawn(caller.call((mut_msg.clone(), ev)));
733                        match handler.catch_unwind().await {
734                            Ok(ok) => match ok {
735                                Ok(_) => {},
736                                Err(err) => {
737                                    error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
738                                    mut_msg.reply(Body::Panic(err.to_string()));
739                                }
740                            },
741                            Err(err) => {
742                                if let Some (err_msg) = err.downcast_ref::<&str>() {
743                                    error!("{:?}", err_msg);
744                                    mut_msg.reply(Body::Panic(err_msg.to_string()));
745                                } else {
746                                    mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
747                                }
748
749                            }
750                        }
751                    }
752                    mut_msg.invoke.store(true, Ordering::SeqCst);
753                }
754                if mut_msg.inner.borrow().address.is_some() {
755                    if mut_msg.invoke_count.load(Ordering::SeqCst) == 5 {
756                        mut_msg.reply(Body::Panic("Message got stuck in a loop probably due to an error in the reply to the sub-message. Message deleted! ".to_string()));
757                        inner_sender.send(mut_msg.clone()).unwrap();
758                    } else if mut_msg.invoke_count.load(Ordering::SeqCst) < 5  {
759                        inner_sender.send(mut_msg.clone()).unwrap();
760                    }
761                }
762            }
763            None => {
764                let callback = inner_cf.remove(address);
765                if let Some(caller) = callback {
766                    #[cfg(not(feature = "catch_unwind"))]
767                    caller.1.call((mut_msg, ev)).await;
768                    #[cfg(feature = "catch_unwind")]
769                    {
770                        let handler = tokio::spawn(caller.1.call((mut_msg.clone(), ev)));
771                        match handler.catch_unwind().await {
772                            Ok(ok) => match ok {
773                                Ok(_) => {},
774                                Err(err) => {
775                                    error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
776                                    mut_msg.reply(Body::Panic(err.to_string()));
777                                }
778                            },
779                            Err(err) => {
780                                if let Some (err_msg) = err.downcast_ref::<&str>() {
781                                    error!("{:?}", err_msg);
782                                    mut_msg.reply(Body::Panic(err_msg.to_string()));
783                                } else {
784                                    mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
785                                }
786
787                            }
788                        }
789                    }
790                }
791            }
792        }
793    }
794
795    #[inline]
796    async fn get_stream(mut_msg: Arc<Message>, stream: &Arc<TcpStream>) {
797        let mut stream = stream.clone();
798        unsafe {
799            let tcps = Arc::get_mut_unchecked(&mut stream);
800            match tcps.write_all(&mut_msg.to_vec().unwrap()).await {
801                Ok(_r) => {
802                    let mut response = [0u8; 1];
803                    let _ = tcps.read(&mut response);
804                }
805                Err(e) => {
806                    warn!("Error in send message: {:?}", e);
807                }
808            }
809        }
810    }
811
812    #[inline]
813    async fn create_stream(mut_msg: Arc<Message>, host: String, port: i32, node_id: String) {
814        match TcpStream::connect(format!("{}:{}", host, port)).await {
815            Ok(mut stream) => match stream.write_all(&mut_msg.to_vec().unwrap()).await {
816                Ok(_r) => {
817                    let mut response = [0u8; 1];
818                    let _ = stream.read(&mut response);
819                    let mut tcps = TCPS.clone();
820                    unsafe {
821                        let tcps = Arc::get_mut_unchecked(&mut tcps);
822                        tcps.insert(node_id, Arc::new(stream));
823                    }
824                }
825                Err(e) => {
826                    warn!("Error in send message: {:?}", e);
827                }
828            },
829            Err(err) => {
830                warn!("Error in send message: {:?}", err);
831            }
832        }
833    }
834
835    #[inline]
836    async fn send_reply(mut_msg: Arc<Message>, host: String, port: i32) {
837        // tokio::spawn(async move {
838
839            let tcp_stream = TCPS.get(&format!("{}_{}", host.clone(), port));
840            match tcp_stream {
841                Some(stream) => unsafe {
842                    let mut stream = stream.clone();
843                    let stream = Arc::get_mut_unchecked(&mut stream);
844                    match stream.write_all(&mut_msg.to_vec().unwrap()).await {
845                        Ok(_) => {},
846                        Err(e) => {
847                            warn!("Error in send message: {:?}", e);
848                        }
849                    }
850                },
851                None => {
852                    match TcpStream::connect(format!("{}:{}", host, port)).await {
853                        Ok(mut stream) => match stream.write_all(&mut_msg.to_vec().unwrap()).await {
854                            Ok(_r) => {
855                                let mut tcps = TCPS.clone();
856                                unsafe {
857                                    let tcps = Arc::get_mut_unchecked(&mut tcps);
858                                    tcps.insert(format!("{}_{}", host.clone(), port), Arc::new(stream));
859                                }
860                            }
861                            Err(e) => {
862                                warn!("Error in send message: {:?}", e);
863                            }
864                        },
865                        Err(err) => {
866                            warn!("Error in send message: {:?}", err);
867                        }
868                    }
869                }
870            }
871        // });
872
873    }
874
875    #[inline]
876    fn registry_in_cm(&mut self) {
877        let opt_cm = Arc::get_mut(&mut self.cluster_manager).unwrap();
878        match opt_cm {
879            Some(cm) => {
880                cm.set_cluster_node_info(ClusterNodeInfo {
881                    nodeId: cm.get_node_id(),
882                    serverID: ServerID {
883                        host: self.options.vertx_host.clone(),
884                        port: self.event_bus_port as i32,
885                    },
886                });
887            }
888            None => {}
889        }
890    }
891
892    async fn create_net_server(&mut self) -> &mut NetServer<CM> {
893        let net_server = net::NetServer::<CM>::new(self.self_arc.clone());
894        net_server.listen_for_message(self.options.vertx_port, move |req, send| {
895            let resp = vec![];
896            let msg = Arc::new(Message::from(req));
897            debug!("net_server => {:?}", msg);
898
899            let _ = send.send(msg);
900
901            resp
902        }).await;
903        net_server
904    }
905
906    pub fn local_consumer<OP>(&self, address: &str, op: OP)
907        where
908            OP: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
909    {
910        let local_op = wrap_with_catch_unwind(op);
911        unsafe {
912            let mut local_cons = self.consumers_async.clone();
913            Arc::get_mut_unchecked(&mut local_cons).insert(address.to_string(), local_op);
914        }
915    }
916
917    pub fn consumer<OP>(&self, address: &str, op: OP)
918    where
919        OP: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
920    {
921        unsafe {
922            let mut local_cons = self.consumers.clone();
923            let local_op = wrap_with_catch_unwind(op);
924            Arc::get_mut_unchecked(&mut local_cons).insert(address.to_string(), local_op);
925        }
926        match self.cluster_manager.as_ref() {
927            Some(cm) => {
928                cm.add_sub(address.to_string());
929            }
930            None => {}
931        };
932    }
933
934    #[inline]
935    pub fn send(&self, address: &str, request: Body) {
936        let addr = address.to_owned();
937        let message_inner = MessageInner {
938            address: Some(addr),
939            replay: None,
940            body: Arc::new(request),
941            ..Default::default()
942        };
943        let message = Message{
944            inner: AtomicRefCell::new(message_inner),
945            invoke: Default::default(),
946            invoke_count: AtomicI16::new(0)
947        };
948        let local_sender = self.sender.lock();
949        local_sender.send(Arc::new(message)).unwrap();
950    }
951
952    #[inline]
953    pub fn publish(&self, address: &str, request: Body) {
954        let addr = address.to_owned();
955        let message_inner = MessageInner {
956            address: Some(addr),
957            replay: None,
958            body: Arc::new(request),
959            publish: true,
960            ..Default::default()
961        };
962        let message = Message{
963            inner: AtomicRefCell::new(message_inner),
964            invoke: Default::default(),
965            invoke_count: AtomicI16::new(0)
966        };
967        let local_sender = self.sender.lock();
968        local_sender.send(Arc::new(message)).unwrap();
969    }
970
971    #[inline]
972    pub fn request<OP>(&self, address: &str, request: Body, op: OP)
973    where
974        OP: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
975    {
976        let addr = address.to_owned();
977        let message_inner = MessageInner {
978            address: Some(addr),
979            replay: Some(format!(
980                "__vertx.reply.{}",
981                uuid::Uuid::new_v4().to_string()
982            )),
983            body: Arc::new(request),
984            host: self.options.vertx_host.clone(),
985            port: self.event_bus_port as i32,
986            ..Default::default()
987        };
988        let message = Message{
989            inner: AtomicRefCell::new(message_inner),
990            invoke: Default::default(),
991            invoke_count: AtomicI16::new(0)
992        };
993        let local_cons = self.callback_functions.clone();
994        let local_op = wrap_with_catch_unwind(op);
995        local_cons
996            .insert(message.replay().unwrap(), local_op);
997        let local_sender = self.sender.lock();
998        if let Err(err) = local_sender.send(Arc::new(message)) {
999            warn!("{:?}", err);
1000        }
1001    }
1002}