1pub mod cm;
2pub mod message;
3
4use crate::http::HttpServer;
5use crate::net;
6use crate::net::NetServer;
7use crate::vertx::cm::{ClusterManager, ClusterNodeInfo, ServerID};
8use crate::vertx::message::{Message, Body, MessageInner};
9use core::fmt::Debug;
10use hashbrown::HashMap;
11use log::{debug, error, info, trace, warn};
12use signal_hook::iterator::Signals;
13use std::sync::atomic::{AtomicBool, AtomicI16, Ordering};
14use std::{
15 sync::{Arc},
16};
17use std::future::Future;
18use tokio::net::TcpStream;
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use std::marker::PhantomData;
21use std::panic::RefUnwindSafe;
22use std::pin::Pin;
23use std::process::exit;
24use atomic_refcell::AtomicRefCell;
25use crossbeam_channel::{bounded, Receiver, Sender};
26use dashmap::DashMap;
27use parking_lot::Mutex;
29use tokio::task::JoinHandle;
30#[cfg(feature = "catch_unwind")]
31use futures::FutureExt;
32
33pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
36type PinBoxFnMessage<CM> = Pin<Box<dyn Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe>>;
37
38
39pub fn wrap_with_catch_unwind<CM, F>(func: F) -> PinBoxFnMessage<CM>
40 where
41 CM: ClusterManager + Send + Sync + 'static + RefUnwindSafe,
42 F: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
43{
44 let wrapped_func = move |msg: Arc<Message>, bus: Arc<EventBus<CM>>| {
45 let result = std::panic::catch_unwind(|| func(msg.clone(), bus.clone()));
46 match result {
47 Ok(future) => future,
48 Err(err) => Box::pin(async move {
49 if let Some (err_msg) = err.downcast_ref::<&str>() {
50 error!("{:?} in function: {:?}", err_msg, std::any::type_name::<F>());
51 msg.reply(Body::Panic(format!("{:?} in function: {:?}", err_msg, std::any::type_name::<F>())));
52 } else {
53 msg.reply(Body::Panic("Unknown panic!".to_string()));
54 }
55 }),
56 }
57 };
58
59 Box::pin(wrapped_func)
60}
61
62lazy_static! {
63 static ref TCPS: Arc<HashMap<String, Arc<TcpStream>>> = Arc::new(HashMap::new());
64 static ref DO_INVOKE: AtomicBool = AtomicBool::new(true);
65}
66
67#[derive(Debug, Clone)]
69pub struct VertxOptions {
70 worker_pool_size: usize,
72 event_bus_options: EventBusOptions,
74}
75
76impl VertxOptions {
77 pub fn worker_pool_size(&mut self, size: usize) -> &mut Self {
78 self.worker_pool_size = size;
79 self
80 }
81
82 pub fn event_bus_options(&mut self) -> &mut EventBusOptions {
83 &mut self.event_bus_options
84 }
85}
86
87impl Default for VertxOptions {
88 fn default() -> Self {
89 let cpus = num_cpus::get();
90 let vertx_port: u16 = 0;
91 let vertx_host = "127.0.0.1".to_owned();
92 VertxOptions {
93 worker_pool_size: cpus / 2,
94 event_bus_options: EventBusOptions::from((vertx_host, vertx_port)),
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct EventBusOptions {
102 event_bus_pool_size: usize,
104 event_bus_queue_size: usize,
106 vertx_host: String,
108 vertx_port: u16,
110}
111
112impl EventBusOptions {
113 pub fn event_bus_pool_size(&mut self, size: usize) -> &mut Self {
114 self.event_bus_pool_size = size;
115 self
116 }
117
118 pub fn host(&mut self, host: String) -> &mut Self {
119 self.vertx_host = host;
120 self
121 }
122
123 pub fn port(&mut self, port: u16) -> &mut Self {
124 self.vertx_port = port;
125 self
126 }
127
128 pub fn event_bus_queue_size(&mut self, size: usize) -> &mut Self {
129 self.event_bus_queue_size = size;
130 self
131 }
132}
133
134impl From<(String, u16)> for EventBusOptions {
135 fn from(opts: (String, u16)) -> Self {
136 let cpus = num_cpus::get();
137 EventBusOptions {
138 event_bus_pool_size: cpus,
139 vertx_host: opts.0,
140 vertx_port: opts.1,
141 event_bus_queue_size: 2000,
142 }
143 }
144}
145
146impl Default for EventBusOptions {
147 fn default() -> Self {
148 let cpus = num_cpus::get();
149 let vertx_port: u16 = 0;
150 EventBusOptions {
151 event_bus_pool_size: cpus / 2,
152 vertx_host: String::from("127.0.0.1"),
153 vertx_port,
154 event_bus_queue_size: 2000,
155 }
156 }
157}
158
159pub struct Vertx<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> {
160 #[allow(dead_code)]
161 options: VertxOptions,
162 event_bus: Arc<EventBus<CM>>,
163 ph: PhantomData<CM>,
164}
165
166impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> Vertx<CM> {
167 pub fn new(options: VertxOptions) -> Vertx<CM> {
168 let event_bus = EventBus::<CM>::new(options.event_bus_options.clone());
169 Vertx {
170 options,
171 event_bus: Arc::new(event_bus),
172 ph: PhantomData,
173 }
174 }
175
176 pub fn set_cluster_manager(&mut self, cm: CM) {
177 debug!("set_cluster_manager: {:?}", cm.get_nodes());
178 Arc::get_mut(&mut self.event_bus)
179 .unwrap()
180 .set_cluster_manager(cm);
181 }
182
183 pub async fn start(&self) {
184 info!("start vertx version {}", env!("CARGO_PKG_VERSION"));
185 let mut signals = Signals::new(&[2]).unwrap();
186 let event_bus = self.event_bus.clone();
187 tokio::spawn(async move {
188 let sig = signals.forever().next();
189 info!("Stopping vertx with signal: {:?}", sig.unwrap());
190 drop(event_bus.sender.data_ptr());
191 exit(sig.unwrap());
192 });
193 self.event_bus.start().await;
194 }
195
196 pub async fn create_http_server(&self) -> HttpServer<CM> {
197 let _ = self.event_bus().await;
198 HttpServer::new(Some(self.event_bus.clone()))
199 }
200
201 pub async fn create_net_server(&self) -> &'static mut NetServer<CM> {
202 let _ = self.event_bus().await;
203 NetServer::new(Some(self.event_bus.clone()))
204 }
205
206 pub async fn event_bus(&self) -> Arc<EventBus<CM>> {
207 if !self.event_bus.init {
208 let mut ev = self.event_bus.clone();
209 unsafe {
210 let opt_ev = Arc::get_mut_unchecked(&mut ev);
211 opt_ev.init().await;
212 opt_ev.init = true;
213 }
214 }
215 self.event_bus.clone()
216 }
217
218 pub async fn stop(&self) {
219 self.event_bus.stop().await;
220 }
221}
222
223pub struct EventBus<CM: 'static + ClusterManager + Send + Sync+ RefUnwindSafe> {
224 options: EventBusOptions,
225 consumers: Arc<HashMap<String, PinBoxFnMessage<CM>>>,
226 consumers_async: Arc<HashMap<String, PinBoxFnMessage<CM>>>,
227 callback_functions:
228 Arc<DashMap<String, PinBoxFnMessage<CM>>>,
229 pub(crate) sender: Mutex<Sender<Arc<Message>>>,
230 receiver_joiner: Arc<JoinHandle<()>>,
231 cluster_manager: Arc<Option<CM>>,
232 event_bus_port: u16,
233 self_arc: Option<Arc<EventBus<CM>>>,
234 init: bool,
235}
236
237impl <CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> RefUnwindSafe for EventBus<CM> {}
238
239impl<CM: 'static + ClusterManager + Send + Sync + RefUnwindSafe> EventBus<CM> {
240 pub fn new(options: EventBusOptions) -> EventBus<CM> {
241 let (sender, _): (Sender<Arc<Message>>, Receiver<Arc<Message>>) = bounded(1);
242 let receiver_joiner = tokio::spawn(async {});
243 let ev = EventBus {
244 options,
245 consumers: Arc::new(HashMap::new()),
246 consumers_async: Arc::new(HashMap::new()),
247 callback_functions: Arc::new(DashMap::new()),
248 sender: Mutex::new(sender),
249 receiver_joiner: Arc::new(receiver_joiner),
250 cluster_manager: Arc::new(None),
251 event_bus_port: 0,
252 self_arc: None,
253 init: false,
254 };
255 ev
256 }
257
258 fn set_cluster_manager(&mut self, cm: CM) {
259 let mut m = cm;
260 m.join();
261 self.cluster_manager = Arc::new(Some(m));
262 }
263
264 async fn start(&self) {
265 let joiner = &self.receiver_joiner;
266 let h = joiner.clone();
267 unsafe {
268 let val: JoinHandle<()> = std::ptr::read(&*h);
269 val.await.unwrap();
270 }
271 }
272
273 async fn stop(&self) {
274 info!("stopping event_bus");
275 DO_INVOKE.store(false, Ordering::Relaxed);
276 self.send("stop", Body::String("stop".to_string()));
277 }
278
279 async fn init(&mut self) {
280 let (sender, receiver): (Sender<Arc<Message>>, Receiver<Arc<Message>>) = bounded(self.options.event_bus_queue_size);
281 self.sender = Mutex::new(sender);
282 let local_consumers = self.consumers.clone();
283 let local_cf = self.callback_functions.clone();
284 let local_sender = self.sender.lock().clone();
285 self.self_arc = Some(unsafe { Arc::from_raw(self) });
286
287 let net_server = self.create_net_server().await;
288 self.event_bus_port = net_server.port;
289 self.registry_in_cm();
290 info!(
291 "start event_bus on tcp://{}:{}",
292 self.options.vertx_host, self.event_bus_port
293 );
294
295 self.prepare_consumer_msg(receiver, local_consumers, local_cf, local_sender).await;
296 }
297
298 async fn prepare_consumer_msg(
299 &mut self,
300 receiver: Receiver<Arc<Message>>,
301 local_consumers: Arc<
302 HashMap<String, PinBoxFnMessage<CM>>,
303 >,
304 local_cf: Arc<
305 DashMap<String, PinBoxFnMessage<CM>>,
306 >,
307 local_sender: Sender<Arc<Message>>,
308 ) {
309 let local_cm = self.cluster_manager.clone();
310 let local_ev = self.self_arc.clone();
311 let local_local_consumers = self.consumers_async.clone();
312
313 let joiner = tokio::spawn(async move {
314 loop {
315 if !DO_INVOKE.load(Ordering::Relaxed) {
316 return;
317 }
318 match receiver.recv() {
319 Ok(msg) => {
320 trace!("{:?} - {:?}", msg.invoke_count, msg.inner.borrow());
321 let inner_consummers = local_consumers.clone();
322 let inner_cf = local_cf.clone();
323 let inner_sender = local_sender.clone();
324 let inner_cm = local_cm.clone();
325 let inner_ev = local_ev.clone();
326 let inner_local_consumers = local_local_consumers.clone();
327
328 let mut_msg = msg;
330 match mut_msg.address() {
331 Some(address) => {
332 if inner_local_consumers.contains_key(&address) {
333 <EventBus<CM>>::call_local_async(
334 &inner_local_consumers,
335 &inner_sender,
336 mut_msg,
337 &address,
338 inner_ev.clone().unwrap(),
339 inner_cf,
340 ).await;
341 } else {
343 let mut inner_cm0 = inner_cm.clone();
345 let manager = unsafe { Arc::get_mut_unchecked(&mut inner_cm0) };
346 match manager {
347 Some(cm) => {
349 debug!(
350 "manager: {:?}",
351 cm.get_subs().read().unwrap().len()
352 );
353 let nodes_lock = {
354 let subs = cm.get_subs();
355 let nodes = subs.read().unwrap();
356 match nodes.get_vec(&address) {
357 None => None,
358 Some(n) => {
359 Some(n.to_vec())
360 }
361 }
362 };
363
364 match nodes_lock {
365 Some(n) => {
366 if n.is_empty() {
367 warn!("subs not found");
368 } else {
369 <EventBus<CM>>::send_message(
370 &inner_consummers,
371 &inner_sender,
372 &inner_ev,
373 mut_msg,
374 &address,
375 cm,
376 &n,
377 inner_cf,
378 ).await;
379 }
380 }
381 None => {
382 let callback = {
383 inner_cf.remove(&address)
384 };
385
386 match callback {
387 Some(caller) => {
388 #[cfg(not(feature = "catch_unwind"))]
389 caller.1.call((mut_msg.clone(), inner_ev.clone().unwrap())).await;
390 #[cfg(feature = "catch_unwind")]
391 {
392 let handler = tokio::spawn(caller.1.call((mut_msg.clone(), inner_ev.clone().unwrap())));
393 match handler.catch_unwind().await {
394 Ok(ok) => match ok {
395 Ok(_) => {},
396 Err(err) => {
397 error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
398 mut_msg.reply(Body::Panic(err.to_string()));
399 }
400 },
401 Err(err) => {
402 if let Some (err_msg) = err.downcast_ref::<&str>() {
403 error!("{:?}", err_msg);
404 mut_msg.reply(Body::Panic(err_msg.to_string()));
405 } else {
406 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
407 }
408
409 }
410 }
411 }
412 }
413 None => {
414 let host = mut_msg.inner.borrow().host.clone();
415 let port = mut_msg.inner.borrow().port;
416 <EventBus<CM>>::send_reply(mut_msg, host, port).await;
417 }
418 }
419 }
420 }
421 }
422 None => {
423 <EventBus<CM>>::call_local_func(
425 &inner_consummers,
426 &inner_sender,
427 mut_msg,
428 &address,
429 inner_ev.clone().unwrap(),
430 inner_cf,
431 ).await;
432 }
433 }
434 }
435
436 }
437 None => <EventBus<CM>>::call_replay(
438 inner_cf,
439 mut_msg,
440 inner_ev.clone().unwrap(),
441 ).await,
442 }
443 }
445 Err(e) => {
446 error!("Error: {:?}", e);
447 }
448 }
449 }
450 });
451 self.receiver_joiner = Arc::new(joiner);
452 }
453
454 #[inline]
455 async fn send_message(
456 inner_consummers: &Arc<
457 HashMap<String, PinBoxFnMessage<CM>>,
458 >,
459 inner_sender: &Sender<Arc<Message>>,
460 inner_ev: &Option<Arc<EventBus<CM>>>,
461 mut_msg: Arc<Message>,
462 address: &str,
463 cm: &mut CM,
464 nodes: &[ClusterNodeInfo],
465 inner_cf: Arc<
466 DashMap<String, PinBoxFnMessage<CM>>,
467 >,
468 ) {
469 if mut_msg.inner.borrow().publish {
470 for node in nodes {
471 let mut node = Some(node);
472 <EventBus<CM>>::send_to_node(
473 &inner_consummers,
474 &inner_sender,
475 inner_ev,
476 mut_msg.clone(),
477 &address,
478 cm,
479 inner_cf.clone(),
480 &mut node,
481 ).await
482 }
483 } else {
484 let idx = cm.next(nodes.len());
485 let mut node = nodes.get(idx);
486 match node {
487 Some(_) => {}
488 None => {
489 let idx = cm.next(nodes.len());
490 node = nodes.get(idx);
491 }
492 }
493 <EventBus<CM>>::send_to_node(
494 &inner_consummers,
495 &inner_sender,
496 inner_ev,
497 mut_msg,
498 &address,
499 cm,
500 inner_cf,
501 &mut node,
502 ).await
503 }
504 }
505
506 #[inline]
507 async fn send_to_node(
508 inner_consummers: &&Arc<
509 HashMap<String, PinBoxFnMessage<CM>>,
510 >,
511 inner_sender: &&Sender<Arc<Message>>,
512 inner_ev: &Option<Arc<EventBus<CM>>>,
513 mut_msg: Arc<Message>,
514 address: &&str,
515 cm: &mut CM,
516 inner_cf: Arc<
517 DashMap<String, PinBoxFnMessage<CM>>,
518 >,
519 node: &mut Option<&ClusterNodeInfo>,
520 ) {
521 let node = node.unwrap();
522 let host = node.serverID.host.clone();
523 let port = node.serverID.port;
524
525 if node.nodeId == cm.get_node_id() {
526 <EventBus<CM>>::call_local_func(
527 &inner_consummers,
528 &inner_sender,
529 mut_msg,
530 &address,
531 inner_ev.clone().unwrap(),
532 inner_cf,
533 ).await
534 } else {
535 debug!("{:?}", node);
536 let node_id = node.nodeId.clone();
537 let message = mut_msg.clone();
539 let tcp_stream = TCPS.get(&node_id);
541 match tcp_stream {
542 Some(stream) => <EventBus<CM>>::get_stream(message, stream).await,
543
544 None => {
545 <EventBus<CM>>::create_stream(message, host, port, node_id).await;
546 }
547 }
548 }
550 }
551
552 #[inline]
553 async fn call_replay(
554 inner_cf: Arc<
555 DashMap<String, PinBoxFnMessage<CM>>,
556 >,
557 mut_msg: Arc<Message>,
558 ev: Arc<EventBus<CM>>,
559 ) {
560 let address = mut_msg.replay();
561 if let Some(address) = address {
562 let callback = inner_cf.remove(&address);
564 if let Some(caller) = callback {
565
566 #[cfg(not(feature = "catch_unwind"))]
567 caller.1(mut_msg.clone(), ev).await;
568 #[cfg(feature = "catch_unwind")]
569 {
570 let handler = tokio::spawn(caller.1(mut_msg.clone(), ev));
571 match handler.catch_unwind().await {
572 Ok(ok) => match ok {
573 Ok(_) => {},
574 Err(err) => {
575 error!("{:?}", err.to_string());
576 mut_msg.reply(Body::Panic(err.to_string()));
577 }
578 },
579 Err(err) => {
580 if let Some (err_msg) = err.downcast_ref::<&str>() {
581 error!("{:?}", err_msg);
582 mut_msg.reply(Body::Panic(err_msg.to_string()));
583 } else {
584 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
585 }
586
587 }
588 }
589 }
590 }
591 }
592 }
593
594 #[inline]
595 async fn call_local_func(
596 inner_consummers: &Arc<
597 HashMap<String, PinBoxFnMessage<CM>>,
598 >,
599 inner_sender: &Sender<Arc<Message>>,
600 mut_msg: Arc<Message>,
601 address: &str,
602 ev: Arc<EventBus<CM>>,
603 inner_cf: Arc<
604 DashMap<String, PinBoxFnMessage<CM>>,
605 >,
606 ) {
607 let callback = inner_consummers.get(&address.to_string());
608 match callback {
609 Some(caller) => {
610 if !mut_msg.invoke.load(Ordering::SeqCst) {
611 #[cfg(not(feature = "catch_unwind"))]
612 caller(mut_msg.clone(), ev).await;
613 #[cfg(feature = "catch_unwind")]
614 {
615 let handler = tokio::spawn(caller(mut_msg.clone(), ev));
616 match handler.catch_unwind().await {
617 Ok(ok) => match ok {
618 Ok(_) => {},
619 Err(err) => {
620 error!("{:?}", err.to_string());
621 mut_msg.reply(Body::Panic(err.to_string()));
622 }
623 },
624 Err(err) => {
625 if let Some (err_msg) = err.downcast_ref::<&str>() {
626 error!("{:?}", err_msg);
627 mut_msg.reply(Body::Panic(err_msg.to_string()));
628 } else {
629 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
630 }
631
632 }
633 }
634 }
635 mut_msg.invoke.store(true, Ordering::SeqCst);
636 }
637 if mut_msg.inner.borrow().address.is_some() {
638 if mut_msg.invoke_count.load(Ordering::SeqCst) == 5 {
639 mut_msg.reply(Body::Panic("Message got stuck in a loop probably due to an error in the reply to the sub-message. Message deleted!".to_string()));
640 inner_sender.send(mut_msg.clone()).unwrap();
641 } else if mut_msg.invoke_count.load(Ordering::SeqCst) < 5 {
642 inner_sender.send(mut_msg.clone()).unwrap();
643 }
644 }
645 }
646 None => {
647 let callback = inner_cf.remove(address);
648 if let Some(caller) = callback {
649 let msg = mut_msg.clone();
650 #[cfg(not(feature = "catch_unwind"))]
651 caller.1.call((msg, ev)).await;
652 #[cfg(feature = "catch_unwind")]
653 {
654 let handler = tokio::spawn(caller.1.call((msg, ev)));
655 match handler.catch_unwind().await {
656 Ok(ok) => match ok {
657 Ok(_) => {},
658 Err(err) => {
659 error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
660 mut_msg.reply(Body::Panic(err.to_string()));
661 }
662 },
663 Err(err) => {
664 if let Some (err_msg) = err.downcast_ref::<&str>() {
665 error!("{:?}", err_msg);
666 mut_msg.reply(Body::Panic(err_msg.to_string()));
667 } else {
668 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
669 }
670
671 }
672 }
673 }
674 } else { let address = mut_msg.inner.borrow().replay.clone();
676 if let Some(address) = address {
677 let callback = inner_cf.remove(&address);
678 if let Some(caller) = callback {
679 #[cfg(not(feature = "catch_unwind"))]
680 caller.1.call((mut_msg.clone(), ev)).await;
681 #[cfg(feature = "catch_unwind")]
682 {
683 let handler = tokio::spawn(caller.1.call((mut_msg.clone(), ev)));
684 match handler.catch_unwind().await {
685 Ok(ok) => match ok {
686 Ok(_) => {},
687 Err(err) => {
688 error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
689 mut_msg.reply(Body::Panic(err.to_string()));
690 }
691 },
692 Err(err) => {
693 if let Some (err_msg) = err.downcast_ref::<&str>() {
694 error!("{:?}", err_msg);
695 mut_msg.reply(Body::Panic(err_msg.to_string()));
696 } else {
697 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
698 }
699
700 }
701 }
702 }
703 }
704 }
705 }
706 }
707 }
708 }
709
710 #[inline]
711 async fn call_local_async(
712 inner_consummers: &Arc<
713 HashMap<String, PinBoxFnMessage<CM>>,
714 >,
715 inner_sender: &Sender<Arc<Message>>,
716 mut_msg: Arc<Message>,
717 address: &str,
718 ev: Arc<EventBus<CM>>,
719 inner_cf: Arc<
720 DashMap<String, PinBoxFnMessage<CM>>,
721 >,
722 ) {
723 let callback = inner_consummers.get(&address.to_string());
724 match callback {
725 Some(caller) => {
726 mut_msg.invoke_count.fetch_add(1, Ordering::SeqCst);
727 if !mut_msg.invoke.load(Ordering::SeqCst) {
728 #[cfg(not(feature = "catch_unwind"))]
729 caller.call((mut_msg.clone(), ev)).await;
730 #[cfg(feature = "catch_unwind")]
731 {
732 let handler = tokio::spawn(caller.call((mut_msg.clone(), ev)));
733 match handler.catch_unwind().await {
734 Ok(ok) => match ok {
735 Ok(_) => {},
736 Err(err) => {
737 error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
738 mut_msg.reply(Body::Panic(err.to_string()));
739 }
740 },
741 Err(err) => {
742 if let Some (err_msg) = err.downcast_ref::<&str>() {
743 error!("{:?}", err_msg);
744 mut_msg.reply(Body::Panic(err_msg.to_string()));
745 } else {
746 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
747 }
748
749 }
750 }
751 }
752 mut_msg.invoke.store(true, Ordering::SeqCst);
753 }
754 if mut_msg.inner.borrow().address.is_some() {
755 if mut_msg.invoke_count.load(Ordering::SeqCst) == 5 {
756 mut_msg.reply(Body::Panic("Message got stuck in a loop probably due to an error in the reply to the sub-message. Message deleted! ".to_string()));
757 inner_sender.send(mut_msg.clone()).unwrap();
758 } else if mut_msg.invoke_count.load(Ordering::SeqCst) < 5 {
759 inner_sender.send(mut_msg.clone()).unwrap();
760 }
761 }
762 }
763 None => {
764 let callback = inner_cf.remove(address);
765 if let Some(caller) = callback {
766 #[cfg(not(feature = "catch_unwind"))]
767 caller.1.call((mut_msg, ev)).await;
768 #[cfg(feature = "catch_unwind")]
769 {
770 let handler = tokio::spawn(caller.1.call((mut_msg.clone(), ev)));
771 match handler.catch_unwind().await {
772 Ok(ok) => match ok {
773 Ok(_) => {},
774 Err(err) => {
775 error!("{:?} in replay function: {:?}", err.to_string(), mut_msg.address());
776 mut_msg.reply(Body::Panic(err.to_string()));
777 }
778 },
779 Err(err) => {
780 if let Some (err_msg) = err.downcast_ref::<&str>() {
781 error!("{:?}", err_msg);
782 mut_msg.reply(Body::Panic(err_msg.to_string()));
783 } else {
784 mut_msg.reply(Body::Panic("Unknown panic!".to_string()));
785 }
786
787 }
788 }
789 }
790 }
791 }
792 }
793 }
794
795 #[inline]
796 async fn get_stream(mut_msg: Arc<Message>, stream: &Arc<TcpStream>) {
797 let mut stream = stream.clone();
798 unsafe {
799 let tcps = Arc::get_mut_unchecked(&mut stream);
800 match tcps.write_all(&mut_msg.to_vec().unwrap()).await {
801 Ok(_r) => {
802 let mut response = [0u8; 1];
803 let _ = tcps.read(&mut response);
804 }
805 Err(e) => {
806 warn!("Error in send message: {:?}", e);
807 }
808 }
809 }
810 }
811
812 #[inline]
813 async fn create_stream(mut_msg: Arc<Message>, host: String, port: i32, node_id: String) {
814 match TcpStream::connect(format!("{}:{}", host, port)).await {
815 Ok(mut stream) => match stream.write_all(&mut_msg.to_vec().unwrap()).await {
816 Ok(_r) => {
817 let mut response = [0u8; 1];
818 let _ = stream.read(&mut response);
819 let mut tcps = TCPS.clone();
820 unsafe {
821 let tcps = Arc::get_mut_unchecked(&mut tcps);
822 tcps.insert(node_id, Arc::new(stream));
823 }
824 }
825 Err(e) => {
826 warn!("Error in send message: {:?}", e);
827 }
828 },
829 Err(err) => {
830 warn!("Error in send message: {:?}", err);
831 }
832 }
833 }
834
835 #[inline]
836 async fn send_reply(mut_msg: Arc<Message>, host: String, port: i32) {
837 let tcp_stream = TCPS.get(&format!("{}_{}", host.clone(), port));
840 match tcp_stream {
841 Some(stream) => unsafe {
842 let mut stream = stream.clone();
843 let stream = Arc::get_mut_unchecked(&mut stream);
844 match stream.write_all(&mut_msg.to_vec().unwrap()).await {
845 Ok(_) => {},
846 Err(e) => {
847 warn!("Error in send message: {:?}", e);
848 }
849 }
850 },
851 None => {
852 match TcpStream::connect(format!("{}:{}", host, port)).await {
853 Ok(mut stream) => match stream.write_all(&mut_msg.to_vec().unwrap()).await {
854 Ok(_r) => {
855 let mut tcps = TCPS.clone();
856 unsafe {
857 let tcps = Arc::get_mut_unchecked(&mut tcps);
858 tcps.insert(format!("{}_{}", host.clone(), port), Arc::new(stream));
859 }
860 }
861 Err(e) => {
862 warn!("Error in send message: {:?}", e);
863 }
864 },
865 Err(err) => {
866 warn!("Error in send message: {:?}", err);
867 }
868 }
869 }
870 }
871 }
874
875 #[inline]
876 fn registry_in_cm(&mut self) {
877 let opt_cm = Arc::get_mut(&mut self.cluster_manager).unwrap();
878 match opt_cm {
879 Some(cm) => {
880 cm.set_cluster_node_info(ClusterNodeInfo {
881 nodeId: cm.get_node_id(),
882 serverID: ServerID {
883 host: self.options.vertx_host.clone(),
884 port: self.event_bus_port as i32,
885 },
886 });
887 }
888 None => {}
889 }
890 }
891
892 async fn create_net_server(&mut self) -> &mut NetServer<CM> {
893 let net_server = net::NetServer::<CM>::new(self.self_arc.clone());
894 net_server.listen_for_message(self.options.vertx_port, move |req, send| {
895 let resp = vec![];
896 let msg = Arc::new(Message::from(req));
897 debug!("net_server => {:?}", msg);
898
899 let _ = send.send(msg);
900
901 resp
902 }).await;
903 net_server
904 }
905
906 pub fn local_consumer<OP>(&self, address: &str, op: OP)
907 where
908 OP: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
909 {
910 let local_op = wrap_with_catch_unwind(op);
911 unsafe {
912 let mut local_cons = self.consumers_async.clone();
913 Arc::get_mut_unchecked(&mut local_cons).insert(address.to_string(), local_op);
914 }
915 }
916
917 pub fn consumer<OP>(&self, address: &str, op: OP)
918 where
919 OP: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
920 {
921 unsafe {
922 let mut local_cons = self.consumers.clone();
923 let local_op = wrap_with_catch_unwind(op);
924 Arc::get_mut_unchecked(&mut local_cons).insert(address.to_string(), local_op);
925 }
926 match self.cluster_manager.as_ref() {
927 Some(cm) => {
928 cm.add_sub(address.to_string());
929 }
930 None => {}
931 };
932 }
933
934 #[inline]
935 pub fn send(&self, address: &str, request: Body) {
936 let addr = address.to_owned();
937 let message_inner = MessageInner {
938 address: Some(addr),
939 replay: None,
940 body: Arc::new(request),
941 ..Default::default()
942 };
943 let message = Message{
944 inner: AtomicRefCell::new(message_inner),
945 invoke: Default::default(),
946 invoke_count: AtomicI16::new(0)
947 };
948 let local_sender = self.sender.lock();
949 local_sender.send(Arc::new(message)).unwrap();
950 }
951
952 #[inline]
953 pub fn publish(&self, address: &str, request: Body) {
954 let addr = address.to_owned();
955 let message_inner = MessageInner {
956 address: Some(addr),
957 replay: None,
958 body: Arc::new(request),
959 publish: true,
960 ..Default::default()
961 };
962 let message = Message{
963 inner: AtomicRefCell::new(message_inner),
964 invoke: Default::default(),
965 invoke_count: AtomicI16::new(0)
966 };
967 let local_sender = self.sender.lock();
968 local_sender.send(Arc::new(message)).unwrap();
969 }
970
971 #[inline]
972 pub fn request<OP>(&self, address: &str, request: Body, op: OP)
973 where
974 OP: Fn(Arc<Message>, Arc<EventBus<CM>>) -> BoxFuture<()> + Send + 'static + Sync + RefUnwindSafe,
975 {
976 let addr = address.to_owned();
977 let message_inner = MessageInner {
978 address: Some(addr),
979 replay: Some(format!(
980 "__vertx.reply.{}",
981 uuid::Uuid::new_v4().to_string()
982 )),
983 body: Arc::new(request),
984 host: self.options.vertx_host.clone(),
985 port: self.event_bus_port as i32,
986 ..Default::default()
987 };
988 let message = Message{
989 inner: AtomicRefCell::new(message_inner),
990 invoke: Default::default(),
991 invoke_count: AtomicI16::new(0)
992 };
993 let local_cons = self.callback_functions.clone();
994 let local_op = wrap_with_catch_unwind(op);
995 local_cons
996 .insert(message.replay().unwrap(), local_op);
997 let local_sender = self.sender.lock();
998 if let Err(err) = local_sender.send(Arc::new(message)) {
999 warn!("{:?}", err);
1000 }
1001 }
1002}