1use super::*;
2use crate::{
3 actors::{
4 Actor,
5 ActorPath,
6 Dispatcher,
7 DynActorRef,
8 NamedPath,
9 SystemPath,
10 Transport,
11 Transport::Tcp,
12 },
13 component::{Component, ComponentContext, ExecuteResult},
14 events::{NetworkDispatcherEvent, NetworkStatus, NetworkStatusPort, NetworkStatusRequest},
15 lookup::{ActorLookup, ActorStore, InsertResult, LookupResult},
16 messaging::{
17 ActorRegistration,
18 DispatchData,
19 DispatchEnvelope,
20 DispatchEvent,
21 MsgEnvelope,
22 NetMessage,
23 PathResolvable,
24 PolicyRegistration,
25 RegistrationEnvelope,
26 RegistrationError,
27 RegistrationEvent,
28 RegistrationPromise,
29 },
30 net::{ConnectionState, NetworkBridgeError, Protocol, SessionId, SocketAddr, buffers::*},
31 queue_manager::QueueManager,
32};
33use arc_swap::ArcSwap;
34use kompact::let_irrefutable;
35use rustc_hash::FxHashMap;
36use std::{collections::VecDeque, sync::Arc, time::Duration};
37
38mod defaults {
40 pub(crate) const RETRY_CONNECTIONS_INTERVAL: u64 = 5000;
41 pub(crate) const BOOT_TIMEOUT: u64 = 5000;
42 pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 10;
43 pub(crate) const SOFT_CONNECTION_LIMIT: u32 = 1000;
44 pub(crate) const HARD_CONNECTION_LIMIT: u32 = 1100;
45}
46
47type NetHashMap<K, V> = FxHashMap<K, V>;
48
49#[derive(Clone, Debug)]
65pub struct NetworkConfig {
66 addr: SocketAddr,
67 transport: Transport,
68 buffer_config: BufferConfig,
69 custom_allocator: Option<Arc<dyn ChunkAllocator>>,
70 tcp_nodelay: bool,
71 max_connection_retry_attempts: u8,
72 connection_retry_interval: u64,
73 boot_timeout: u64,
74 soft_connection_limit: u32,
75 hard_connection_limit: u32,
76}
77
78impl NetworkConfig {
79 pub fn new(addr: SocketAddr) -> Self {
82 NetworkConfig {
83 addr,
84 transport: Transport::Tcp,
85 buffer_config: BufferConfig::default(),
86 custom_allocator: None,
87 tcp_nodelay: true,
88 max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
89 connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
90 boot_timeout: defaults::BOOT_TIMEOUT,
91 soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
92 hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
93 }
94 }
95
96 pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
99 buffer_config.validate();
100 let mut cfg = NetworkConfig::new(addr);
101 cfg.set_buffer_config(buffer_config);
102 cfg
103 }
104
105 pub fn with_custom_allocator(
108 addr: SocketAddr,
109 buffer_config: BufferConfig,
110 custom_allocator: Arc<dyn ChunkAllocator>,
111 ) -> Self {
112 buffer_config.validate();
113 NetworkConfig {
114 addr,
115 transport: Transport::Tcp,
116 buffer_config,
117 custom_allocator: Some(custom_allocator),
118 tcp_nodelay: true,
119 max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
120 connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
121 boot_timeout: defaults::BOOT_TIMEOUT,
122 soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
123 hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
124 }
125 }
126
127 pub fn with_socket(mut self, addr: SocketAddr) -> Self {
129 self.addr = addr;
130 self
131 }
132
133 pub fn build(self) -> impl Fn(KPromise<()>) -> NetworkDispatcher {
138 move |notify_ready| NetworkDispatcher::with_config(self.clone(), notify_ready)
139 }
140
141 pub fn get_buffer_config(&self) -> &BufferConfig {
143 &self.buffer_config
144 }
145
146 pub fn set_buffer_config(&mut self, buffer_config: BufferConfig) {
148 self.buffer_config = buffer_config;
149 }
150
151 pub fn get_custom_allocator(&self) -> &Option<Arc<dyn ChunkAllocator>> {
153 &self.custom_allocator
154 }
155
156 pub fn get_tcp_nodelay(&self) -> bool {
158 self.tcp_nodelay
159 }
160
161 pub fn set_tcp_nodelay(&mut self, nodelay: bool) {
167 self.tcp_nodelay = nodelay;
168 }
169
170 pub fn set_max_connection_retry_attempts(&mut self, count: u8) {
175 self.max_connection_retry_attempts = count;
176 }
177
178 pub fn get_max_connection_retry_attempts(&self) -> u8 {
180 self.max_connection_retry_attempts
181 }
182
183 pub fn set_connection_retry_interval(&mut self, milliseconds: u64) {
187 self.connection_retry_interval = milliseconds;
188 }
189
190 pub fn get_connection_retry_interval(&self) -> u64 {
192 self.connection_retry_interval
193 }
194
195 pub fn set_boot_timeout(&mut self, milliseconds: u64) {
199 self.boot_timeout = milliseconds;
200 }
201
202 pub fn get_boot_timeout(&self) -> u64 {
204 self.boot_timeout
205 }
206
207 pub fn set_soft_connection_limit(&mut self, limit: u32) {
213 self.soft_connection_limit = limit;
214 }
215
216 pub fn get_soft_connection_limit(&self) -> u32 {
218 self.soft_connection_limit
219 }
220
221 pub fn set_hard_connection_limit(&mut self, limit: u32) {
228 self.hard_connection_limit = limit;
229 }
230
231 pub fn get_hard_connection_limit(&self) -> u32 {
233 self.hard_connection_limit
234 }
235}
236
237impl Default for NetworkConfig {
239 fn default() -> Self {
240 NetworkConfig {
241 addr: "127.0.0.1:0".parse().unwrap(),
242 transport: Transport::Tcp,
243 buffer_config: BufferConfig::default(),
244 custom_allocator: None,
245 tcp_nodelay: true,
246 max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
247 connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
248 boot_timeout: defaults::BOOT_TIMEOUT,
249 soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
250 hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
251 }
252 }
253}
254
255#[derive(ComponentDefinition)]
268pub struct NetworkDispatcher {
269 ctx: ComponentContext<NetworkDispatcher>,
270 connections: NetHashMap<SocketAddr, ConnectionState>,
272 cfg: NetworkConfig,
274 lookup: Arc<ArcSwap<ActorStore>>,
276 net_bridge: Option<net::Bridge>,
279 system_path: Option<SystemPath>,
281 queue_manager: QueueManager,
283 reaper: lookup::gc::ActorRefReaper,
285 notify_ready: Option<KPromise<()>>,
286 retry_map: FxHashMap<SocketAddr, u8>,
288 garbage_buffers: VecDeque<BufferChunk>,
289 network_status_port: ProvidedPort<NetworkStatusPort>,
291}
292
293impl NetworkDispatcher {
294 pub fn new(notify_ready: KPromise<()>) -> Self {
312 let config = NetworkConfig::default();
313 NetworkDispatcher::with_config(config, notify_ready)
314 }
315
316 pub fn with_config(cfg: NetworkConfig, notify_ready: KPromise<()>) -> Self {
321 let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new()));
322 let reaper = lookup::gc::ActorRefReaper::default();
324
325 NetworkDispatcher {
326 ctx: ComponentContext::uninitialised(),
327 connections: Default::default(),
328 cfg,
329 lookup,
330 net_bridge: None,
331 system_path: None,
332 queue_manager: QueueManager::new(),
333 reaper,
334 notify_ready: Some(notify_ready),
335 garbage_buffers: VecDeque::new(),
336 retry_map: Default::default(),
337 network_status_port: ProvidedPort::uninitialised(),
338 }
339 }
340
341 pub fn system_path_ref(&mut self) -> &SystemPath {
345 if self.system_path.is_none() {
346 let _ = self.system_path(); }
348 self.system_path
349 .as_ref()
350 .expect("Cached value should have been filled by calling self.system_path()!")
351 }
352
353 fn start(&mut self) {
354 debug!(self.ctx.log(), "Starting self and network bridge");
355 self.reaper = lookup::gc::ActorRefReaper::from_config(self.ctx.config());
356 self.start_bridge(self.cfg.addr);
357
358 let deadletter: DynActorRef = self.ctx.system().deadletter_ref().dyn_ref();
359 self.lookup.rcu(|current| {
360 let mut next = ActorStore::clone(current);
361 next.insert(PathResolvable::System, deadletter.clone())
362 .expect("Deadletter shouldn't error");
363 next
364 });
365
366 self.schedule_retries();
367 }
368
369 fn start_bridge(&mut self, address: SocketAddr) {
370 let dispatcher = self
371 .actor_ref()
372 .hold()
373 .expect("Self can hardly be deallocated!");
374 let bridge_logger = self.ctx.log().new(o!("owner" => "Bridge"));
375 let network_thread_logger = self.ctx.log().new(o!("owner" => "NetworkThread"));
376 let (mut bridge, _addr) = net::Bridge::new(
377 self.lookup.clone(),
378 network_thread_logger,
379 bridge_logger,
380 address,
381 dispatcher.clone(),
382 &self.cfg,
383 );
384 bridge.set_dispatcher(dispatcher);
385 self.net_bridge = Some(bridge);
386 }
387
388 fn handle_network_failure(&mut self) {
389 self.network_status_port
390 .trigger(NetworkStatus::CriticalNetworkFailure);
391 let faulty_bridge = self.net_bridge.take();
392 let connections: Vec<(SocketAddr, ConnectionState)> = self.connections.drain().collect();
393 for (address, state) in connections {
394 if let ConnectionState::Connected(id) = state {
395 self.connection_lost(SystemPath::with_socket(Transport::Tcp, address), id);
396 } else {
397 self.connections.insert(address, state);
398 }
399 }
400 let bound_address = faulty_bridge
402 .map(|b| b.local_addr().unwrap_or(self.cfg.addr))
403 .unwrap_or(self.cfg.addr);
404 self.start_bridge(bound_address);
405 }
406
407 fn stop(&mut self) {
408 if let Some(Err(e)) = self.net_bridge.take().map(|bridge| bridge.stop()) {
409 error!(
410 self.ctx().log(),
411 "NetworkBridge did not shut down as expected! Error was:\n {:?}\n", e
412 );
413 }
414 }
415
416 fn kill(&mut self) {
417 if let Some(Err(e)) = self.net_bridge.take().map(|bridge| bridge.kill()) {
418 error!(
419 self.ctx().log(),
420 "NetworkBridge did not shut down as expected! Error was:\n {:?}\n", e
421 );
422 }
423 }
424
425 fn schedule_reaper(&mut self) {
426 if !self.reaper.is_scheduled() {
427 self.reaper.schedule();
429 } else {
430 let num_reaped = self.reaper.run(&self.lookup);
432 if num_reaped == 0 {
433 self.reaper.strategy_mut().incr();
435 } else {
436 self.reaper.strategy_mut().decr();
437 }
438 }
439 let next_wakeup = self.reaper.strategy().curr();
440 debug!(
441 self.ctx().log(),
442 "Scheduling reaping at {:?}ms", next_wakeup
443 );
444
445 let mut retry_queue = VecDeque::new();
446 for mut trash in self.garbage_buffers.drain(..) {
447 if !trash.free() {
448 retry_queue.push_back(trash);
449 }
450 }
451 self.garbage_buffers.append(&mut retry_queue);
453
454 self.schedule_once(Duration::from_millis(next_wakeup), move |target, _id| {
455 target.schedule_reaper();
456 Handled::OK
457 });
458 }
459
460 fn schedule_retries(&mut self) {
461 let drain = self.retry_map.clone();
463 self.retry_map.clear();
464 for (addr, retry) in drain {
465 if retry < self.cfg.max_connection_retry_attempts {
466 self.retry_map.insert(addr, retry + 1);
468 if let Some(bridge) = &self.net_bridge {
469 debug!(
471 self.ctx().log(),
472 "Dispatcher retrying connection to host {}, attempt {}/{}",
473 addr,
474 retry,
475 self.cfg.max_connection_retry_attempts
476 );
477 bridge.connect(Transport::Tcp, addr).unwrap();
478 }
479 } else {
480 info!(
482 self.ctx().log(),
483 "Dispatcher giving up on remote host {}, dropping queues", addr
484 );
485 self.queue_manager.drop_queue(&addr);
486 self.connections.remove(&addr);
487 self.network_status_port
488 .trigger(NetworkStatus::ConnectionDropped(SystemPath::with_socket(
489 Transport::Tcp,
490 addr,
491 )));
492 }
493 }
494 self.schedule_once(
495 Duration::from_millis(self.cfg.connection_retry_interval),
496 move |target, _id| {
497 target.schedule_retries();
498 Handled::OK
499 },
500 );
501 }
502
503 fn on_event(&mut self, ev: Box<dyn DispatchEvent>) {
504 let ev = match ev.into_any().downcast::<NetworkDispatcherEvent>() {
505 Ok(ev) => *ev,
506 Err(_) => {
507 warn!(
508 self.ctx.log(),
509 "Ignoring unexpected dispatcher event in NetworkDispatcher",
510 );
511 return;
512 }
513 };
514 match ev {
515 NetworkDispatcherEvent::Network(ev) => match ev {
516 NetworkStatus::ConnectionEstablished(system_path, session) => {
517 self.connection_established(system_path, session)
518 }
519 NetworkStatus::ConnectionLost(system_path, session) => {
520 self.connection_lost(system_path, session)
521 }
522 NetworkStatus::ConnectionClosed(system_path, session) => {
523 self.connection_closed(system_path, session)
524 }
525 NetworkStatus::ConnectionDropped(system_path) => {
526 let _ = self.retry_map.remove(&system_path.socket_address());
527 self.network_status_port
528 .trigger(NetworkStatus::ConnectionDropped(system_path));
529 }
530 NetworkStatus::BlockedSystem(system_path) => {
531 self.connections
532 .insert(system_path.socket_address(), ConnectionState::Blocked);
533 self.network_status_port
534 .trigger(NetworkStatus::BlockedSystem(system_path));
535 }
536 NetworkStatus::BlockedIp(ip_addr) => {
537 self.network_status_port
538 .trigger(NetworkStatus::BlockedIp(ip_addr));
539 }
540 NetworkStatus::BlockedIpNet(ip_net) => {
541 self.network_status_port
542 .trigger(NetworkStatus::BlockedIpNet(ip_net));
543 }
544 NetworkStatus::AllowedSystem(system_path) => {
545 self.connections.remove(&system_path.socket_address());
546 self.network_status_port
547 .trigger(NetworkStatus::AllowedSystem(system_path));
548 }
549 NetworkStatus::AllowedIp(ip_addr) => {
550 self.network_status_port
551 .trigger(NetworkStatus::AllowedIp(ip_addr));
552 }
553 NetworkStatus::AllowedIpNet(ip_net) => {
554 self.network_status_port
555 .trigger(NetworkStatus::AllowedIpNet(ip_net));
556 }
557 NetworkStatus::SoftConnectionLimitExceeded => self
558 .network_status_port
559 .trigger(NetworkStatus::SoftConnectionLimitExceeded),
560 NetworkStatus::HardConnectionLimitReached => self
561 .network_status_port
562 .trigger(NetworkStatus::HardConnectionLimitReached),
563 NetworkStatus::CriticalNetworkFailure => self.handle_network_failure(),
564 },
565 NetworkDispatcherEvent::RejectedData((addr, data)) => {
566 self.queue_manager.enqueue_priority_data(*data, addr);
568 self.retry_map.entry(addr).or_insert(0);
569 }
570 }
571 }
572
573 fn connection_established(&mut self, system_path: SystemPath, id: SessionId) {
574 info!(
575 self.ctx().log(),
576 "registering newly connected conn at {:?}", system_path
577 );
578 let addr = &system_path.socket_address();
579 self.network_status_port
580 .trigger(NetworkStatus::ConnectionEstablished(system_path, id));
581 let _ = self.retry_map.remove(addr);
582 if self.queue_manager.has_data(addr) {
583 while let Some(frame) = self.queue_manager.pop_data(addr) {
585 if let Some(bridge) = &self.net_bridge {
586 if let Err(e) = bridge.route(*addr, frame, net::Protocol::Tcp) {
588 error!(self.ctx.log(), "Bridge error while routing {:?}", e);
589 }
590 }
591 }
592 }
593 self.connections
594 .insert(*addr, ConnectionState::Connected(id));
595 }
596
597 fn connection_closed(&mut self, system_path: SystemPath, id: SessionId) {
598 let addr = &system_path.socket_address();
599 self.network_status_port
600 .trigger(NetworkStatus::ConnectionClosed(system_path, id));
601 if let Some(Err(e)) = self
603 .net_bridge
604 .as_ref()
605 .map(|bridge| bridge.ack_closed(*addr))
606 {
607 error!(
608 self.ctx.log(),
609 "Bridge error while acking closed connection {:?}", e
610 );
611 }
612 self.connections.insert(*addr, ConnectionState::Closed(id));
613 if self.queue_manager.has_data(addr) {
614 self.retry_map.insert(*addr, 0);
615 }
616 }
617
618 fn connection_lost(&mut self, system_path: SystemPath, id: SessionId) {
619 let addr = &system_path.socket_address();
620 if !self.retry_map.contains_key(addr) {
621 warn!(self.ctx().log(), "connection lost to {:?}", addr);
622 self.retry_map.insert(*addr, 0); }
624 self.network_status_port
625 .trigger(NetworkStatus::ConnectionLost(system_path, id));
626 if let Some(Err(e)) = self
627 .net_bridge
628 .as_ref()
629 .map(|bridge| bridge.ack_closed(*addr))
630 {
631 error!(
632 self.ctx.log(),
633 "Bridge error while acking lost connection {:?}", e
634 );
635 }
636 self.connections.insert(*addr, ConnectionState::Lost(id));
637 }
638
639 fn route_local(&mut self, dst: ActorPath, msg: DispatchData) {
641 let lookup = self.lookup.load();
642 let lookup_result = lookup.get_by_actor_path(&dst);
643 match msg.into_local() {
644 Ok(netmsg) => match lookup_result {
645 LookupResult::Ref(actor) => {
646 actor.tell(netmsg);
647 }
648 LookupResult::Group(group) => {
649 group.route(netmsg, self.log());
650 }
651 LookupResult::None => {
652 error!(
653 self.ctx.log(),
654 "No local actor found at {:?}. Forwarding to DeadletterBox",
655 netmsg.receiver,
656 );
657 self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
658 }
659 LookupResult::Err(e) => {
660 error!(
661 self.ctx.log(),
662 "An error occurred during local actor lookup at {:?}. Forwarding to DeadletterBox. The error was: {}",
663 netmsg.receiver,
664 e
665 );
666 self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
667 }
668 },
669 Err(e) => {
670 error!(self.log(), "Could not serialise msg: {:?}. Dropping...", e);
671 }
672 }
673 }
674
675 fn route_remote_udp(
676 &mut self,
677 addr: SocketAddr,
678 data: DispatchData,
679 ) -> Result<(), NetworkBridgeError> {
680 if let Some(bridge) = &self.net_bridge {
681 bridge.route(addr, data, net::Protocol::Udp)?;
682 } else {
683 warn!(
684 self.ctx.log(),
685 "Dropping UDP message to {}, as bridge is not connected.", addr
686 );
687 }
688 Ok(())
689 }
690
691 fn route_remote_tcp(
692 &mut self,
693 addr: SocketAddr,
694 data: DispatchData,
695 ) -> Result<(), NetworkBridgeError> {
696 let state: &mut ConnectionState =
697 self.connections.entry(addr).or_insert(ConnectionState::New);
698 let next: Option<ConnectionState> = match *state {
699 ConnectionState::New => {
700 debug!(
701 self.ctx.log(),
702 "No connection found; establishing and queuing frame"
703 );
704 self.queue_manager.enqueue_data(data, addr);
705
706 if let Some(ref mut bridge) = self.net_bridge {
707 debug!(self.ctx.log(), "Establishing new connection to {:?}", addr);
708 self.retry_map.insert(addr, 0); bridge.connect(Transport::Tcp, addr).unwrap();
710 Some(ConnectionState::Initializing)
711 } else {
712 error!(self.ctx.log(), "No network bridge found; dropping message");
713 None
714 }
715 }
716 ConnectionState::Connected(_) => {
717 if self.queue_manager.has_data(&addr) {
718 self.queue_manager.enqueue_data(data, addr);
719
720 if let Some(bridge) = &self.net_bridge {
721 while let Some(queued_data) = self.queue_manager.pop_data(&addr) {
722 bridge.route(addr, queued_data, net::Protocol::Tcp)?;
723 }
724 }
725 None
726 } else {
727 if let Some(bridge) = &self.net_bridge {
729 bridge.route(addr, data, net::Protocol::Tcp)?;
730 }
731 None
732 }
733 }
734 ConnectionState::Initializing => {
735 self.queue_manager.enqueue_data(data, addr);
736 None
737 }
738 ConnectionState::Closed(_) => {
739 self.queue_manager.enqueue_data(data, addr);
740 if let Some(bridge) = &self.net_bridge {
741 self.retry_map.entry(addr).or_insert(0);
742 bridge.connect(Tcp, addr)?;
743 }
744 Some(ConnectionState::Initializing)
745 }
746 ConnectionState::Lost(_) => {
747 self.queue_manager.enqueue_data(data, addr);
749 None
750 }
751 ConnectionState::Blocked => {
752 warn!(
753 self.ctx.log(),
754 "Tried sending a message to a blocked connection: {:?}. Dropping message.",
755 addr
756 );
757 None
758 }
759 };
760
761 if let Some(next) = next {
762 *state = next;
763 }
764 Ok(())
765 }
766
767 fn resolve_path(&mut self, resolvable: &PathResolvable) -> Result<ActorPath, PathParseError> {
768 match resolvable {
769 PathResolvable::Path(actor_path) => Ok(actor_path.clone()),
770 PathResolvable::Alias(alias) => self
771 .system_path()
772 .into_named_with_string(alias)
773 .map(|p| p.into()),
774 PathResolvable::Segments(segments) => self
775 .system_path()
776 .into_named_with_vec(segments.to_vec())
777 .map(|p| p.into()),
778 PathResolvable::ActorId(id) => Ok(self.system_path().into_unique(*id).into()),
779 PathResolvable::System => Ok(self.deadletter_path()),
780 }
781 }
782
783 fn route(&mut self, dst: ActorPath, msg: DispatchData) -> Result<(), NetworkBridgeError> {
786 if self.system_path_ref() == dst.system() {
787 self.route_local(dst, msg);
788 Ok(())
789 } else {
790 let proto = dst.system().protocol();
791 match proto {
792 Transport::Local => {
793 self.route_local(dst, msg);
794 Ok(())
795 }
796 Transport::Tcp => {
797 let addr = SocketAddr::new(*dst.address(), dst.port());
798 self.route_remote_tcp(addr, msg)
799 }
800 Transport::Udp => {
801 let addr = SocketAddr::new(*dst.address(), dst.port());
802 self.route_remote_udp(addr, msg)
803 }
804 }
805 }
806 }
807
808 fn deadletter_path(&mut self) -> ActorPath {
809 ActorPath::Named(NamedPath::with_system(self.system_path(), Vec::new()))
810 }
811
812 fn register_actor(
813 &mut self,
814 registration: ActorRegistration,
815 update: bool,
816 promise: RegistrationPromise,
817 ) {
818 let ActorRegistration { actor, path } = registration;
819 let res = self
820 .resolve_path(&path)
821 .map_err(RegistrationError::InvalidPath)
822 .and_then(|ap| {
823 let lease = self.lookup.load();
824 if lease.contains(&path) && !update {
825 warn!(
826 self.ctx.log(),
827 "Detected duplicate path during registration. The path will not be re-registered"
828 );
829 drop(lease);
830 Err(RegistrationError::DuplicateEntry)
831 } else {
832 drop(lease);
833 let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
834 self.lookup.rcu(|current| {
835 let mut next = ActorStore::clone(current);
836 result = next.insert(path.clone(), actor.clone());
837 next
838 });
839 if let Ok(ref res) = result
840 && !res.is_empty()
841 {
842 info!(self.ctx.log(), "Replaced entry for path={:?}", path);
843 }
844 result.map(|_| ap)
845 .map_err(RegistrationError::InvalidPath)
846 }
847 });
848 if res.is_ok() && !self.reaper.is_scheduled() {
849 self.schedule_reaper();
850 }
851 debug!(self.log(), "Completed actor registration with {:?}", res);
852 match promise {
853 RegistrationPromise::Fulfil(promise) => {
854 promise.fulfil(res).unwrap_or_else(|e| {
855 error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
856 });
857 }
858 RegistrationPromise::None => (), }
860 }
861
862 fn register_policy(
863 &mut self,
864 registration: PolicyRegistration,
865 update: bool,
866 promise: RegistrationPromise,
867 ) {
868 let PolicyRegistration { policy, path } = registration;
869 let lease = self.lookup.load();
870 let path_res = PathResolvable::Segments(path);
871 let res = self
872 .resolve_path(&path_res)
873 .map_err(RegistrationError::InvalidPath)
874 .and_then(|ap| {
875 if lease.contains(&path_res) && !update {
876 warn!(
877 self.ctx.log(),
878 "Detected duplicate path during registration. The path will not be re-registered",
879 );
880 drop(lease);
881 Err(RegistrationError::DuplicateEntry)
882 } else {
883 drop(lease);
884 let_irrefutable!(path, PathResolvable::Segments(path) = path_res);
888 let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
889 self.lookup.rcu(|current| {
890 let mut next = ActorStore::clone(current);
891 result = next.set_routing_policy(&path, policy.clone());
892 next
893 });
894 if let Ok(ref res) = result
895 && !res.is_empty()
896 {
897 info!(self.ctx.log(), "Replaced entry for path={:?}", path);
898 }
899 result.map(|_| ap).map_err(RegistrationError::InvalidPath)
900 }
901 });
902 debug!(self.log(), "Completed policy registration with {:?}", res);
903 match promise {
904 RegistrationPromise::Fulfil(promise) => {
905 promise.fulfil(res).unwrap_or_else(|e| {
906 error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
907 });
908 }
909 RegistrationPromise::None => (), }
911 }
912
913 fn close_channel(&mut self, addr: SocketAddr) {
914 if let Some(state) = self.connections.get_mut(&addr) {
915 match state {
916 ConnectionState::Connected(session) => {
917 trace!(
918 self.ctx.log(),
919 "Closing channel to connected system {}, session {:?}", addr, session
920 );
921 if let Some(bridge) = &self.net_bridge {
922 while self.queue_manager.has_data(&addr) {
923 if let Some(Err(e)) = self
924 .queue_manager
925 .pop_data(&addr)
926 .map(|data| bridge.route(addr, data, Protocol::Tcp))
927 {
928 error!(self.ctx.log(), "Bridge error while routing {:?}", e);
929 }
930 }
931 if let Err(e) = bridge.close_channel(addr) {
932 error!(self.ctx.log(), "Bridge error closing channel {:?}", e);
933 }
934 }
935 }
936 _ => {
937 warn!(
938 self.ctx.log(),
939 "Trying to close channel to a system which is not connected {}", addr
940 );
941 }
942 }
943 } else {
944 warn!(self.ctx.log(), "Closing channel to unknown system {}", addr);
945 }
946 }
947}
948
949impl Actor for NetworkDispatcher {
950 type Message = DispatchEnvelope;
951
952 fn receive_local(&mut self, msg: Self::Message) -> HandlerResult {
953 match msg {
954 DispatchEnvelope::Msg { src: _, dst, msg } => {
955 if let Err(e) = self.route(dst, msg) {
956 error!(self.ctx.log(), "Failed to route message: {:?}", e);
957 };
958 }
959 DispatchEnvelope::ForwardedMsg { msg } => {
960 if let Err(e) = self.route(msg.receiver.clone(), DispatchData::NetMessage(msg)) {
962 error!(self.ctx.log(), "Failed to route message: {:?}", e);
963 };
964 }
965 DispatchEnvelope::Registration(reg) => {
966 trace!(self.log(), "Got registration request: {:?}", reg);
967 let RegistrationEnvelope {
968 event,
969 update,
970 promise,
971 } = reg;
972 match event {
973 RegistrationEvent::Actor(rea) => self.register_actor(rea, update, promise),
974 RegistrationEvent::Policy(rep) => self.register_policy(rep, update, promise),
975 }
976 }
977 DispatchEnvelope::Event(ev) => self.on_event(ev),
978 DispatchEnvelope::LockedChunk(trash) => self.garbage_buffers.push_back(trash),
979 }
980 Handled::OK
981 }
982
983 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
984 warn!(self.ctx.log(), "Received network message: {:?}", msg,);
985 Handled::OK
986 }
987}
988
989impl Dispatcher for NetworkDispatcher {
990 fn system_path(&mut self) -> SystemPath {
994 match self.system_path.as_ref() {
995 Some(path) => path.clone(),
996 None => {
997 let bound_addr = match self.net_bridge {
998 Some(ref net_bridge) => net_bridge
999 .local_addr()
1000 .expect("If net bridge is ready, port should be as well!"),
1001 None => panic!(
1002 "You must wait until the socket is bound before attempting to create a system path!"
1003 ),
1004 };
1005 let sp = SystemPath::new(self.cfg.transport, bound_addr.ip(), bound_addr.port());
1006 self.system_path = Some(sp.clone());
1007 sp
1008 }
1009 }
1010 }
1011}
1012
1013impl NetworkDispatcher {
1014 pub fn network_status_port(&mut self) -> &mut ProvidedPort<NetworkStatusPort> {
1016 &mut self.network_status_port
1017 }
1018}
1019
1020impl ComponentLifecycle for NetworkDispatcher {
1021 fn on_start(&mut self) -> HandlerResult {
1022 info!(self.ctx.log(), "Starting network...");
1023 self.start();
1024 info!(self.ctx.log(), "Started network just fine.");
1025 if let Some(promise) = self.notify_ready.take() {
1026 promise
1027 .complete()
1028 .unwrap_or_else(|e| error!(self.ctx.log(), "Could not start network! {:?}", e))
1029 }
1030 Handled::OK
1031 }
1032
1033 fn on_stop(&mut self) -> HandlerResult {
1034 info!(self.ctx.log(), "Stopping network...");
1035 self.stop();
1036 info!(self.ctx.log(), "Stopped network.");
1037 Handled::OK
1038 }
1039
1040 fn on_kill(&mut self) -> HandlerResult {
1041 info!(self.ctx.log(), "Killing network...");
1042 self.kill();
1043 info!(self.ctx.log(), "Killed network.");
1044 Handled::OK
1045 }
1046}
1047
1048impl Provide<NetworkStatusPort> for NetworkDispatcher {
1049 fn handle(&mut self, event: <NetworkStatusPort as Port>::Request) -> HandlerResult {
1050 debug!(
1051 self.ctx.log(),
1052 "Received NetworkStatusPort Request {:?}", event
1053 );
1054 match event {
1055 NetworkStatusRequest::DisconnectSystem(system_path) => {
1056 self.close_channel(system_path.socket_address());
1057 }
1058 NetworkStatusRequest::ConnectSystem(system_path) => {
1059 if let Some(bridge) = &self.net_bridge {
1060 bridge
1061 .connect(system_path.protocol(), system_path.socket_address())
1062 .unwrap();
1063 }
1064 }
1065 NetworkStatusRequest::BlockIp(ip_addr) => {
1066 debug!(self.ctx.log(), "Got BlockIp: {:?}", ip_addr);
1067 if let Some(bridge) = &self.net_bridge {
1068 bridge.block_ip(ip_addr).unwrap();
1069 }
1070 }
1071 NetworkStatusRequest::BlockSystem(system_path) => {
1072 debug!(self.ctx.log(), "Got BlockSystem: {:?}", system_path);
1073 if let Some(bridge) = &self.net_bridge {
1074 bridge.block_socket(system_path.socket_address()).unwrap();
1075 }
1076 }
1077 NetworkStatusRequest::BlockIpNet(ip_net) => {
1078 debug!(self.ctx.log(), "Got BlockIpNet: {:?}", ip_net);
1079 if let Some(bridge) = &self.net_bridge {
1080 bridge.block_ip_net(ip_net).unwrap();
1081 }
1082 }
1083 NetworkStatusRequest::AllowIp(ip_addr) => {
1084 debug!(self.ctx.log(), "Got AllowIp: {:?}", ip_addr);
1085 if let Some(bridge) = &self.net_bridge {
1086 bridge.allow_ip(ip_addr).unwrap();
1087 }
1088 }
1089 NetworkStatusRequest::AllowSystem(system_path) => {
1090 debug!(self.ctx.log(), "Got AllowSystem: {:?}", system_path);
1091 if let Some(bridge) = &self.net_bridge {
1092 bridge.allow_socket(system_path.socket_address()).unwrap();
1093 }
1094 }
1095 NetworkStatusRequest::AllowIpNet(ip_net) => {
1096 debug!(self.ctx.log(), "Got AllowIpNet: {:?}", ip_net);
1097 if let Some(bridge) = &self.net_bridge {
1098 bridge.allow_ip_net(ip_net).unwrap();
1099 }
1100 }
1101 }
1102 Handled::OK
1103 }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::{super::*, *};
1109 use crate::net_test_helpers::{PingerAct, PongerAct};
1110 use std::{thread, time::Duration};
1111
1112 #[test]
1113 fn failed_network() {
1114 let conflicting_socket =
1115 std::net::TcpListener::bind("127.0.0.1:0").expect("temporary conflicting listener");
1116 let conflicting_addr = conflicting_socket.local_addr().expect("listener address");
1117 let mut cfg = kompact::test_support::test_kompact_config();
1118 println!("Configuring network");
1119 cfg.system_components(DeadletterBox::new, {
1120 let net_config = NetworkConfig::new(conflicting_addr);
1121 net_config.build()
1122 });
1123 assert!(
1124 cfg.build().wait().is_err(),
1125 "network startup should fail while another listener owns the socket"
1126 );
1127 }
1128
1129 #[test]
1130 fn network_cleanup() {
1131 let mut cfg = kompact::test_support::test_kompact_config();
1132 println!("Configuring network");
1133 cfg.system_components(DeadletterBox::new, {
1134 let net_config =
1135 NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
1136 net_config.build()
1137 });
1138 println!("Starting KompactSystem");
1139 let system = cfg.build().wait().expect("KompactSystem");
1140 println!("KompactSystem started just fine.");
1141 let named_path = ActorPath::Named(NamedPath::with_system(
1142 system.system_path(),
1143 vec!["test".into()],
1144 ));
1145 println!("Got path: {}", named_path);
1146 let port = system.system_path().port();
1147 println!("Got port: {}", port);
1148 println!("Shutting down first system...");
1149 system
1150 .shutdown()
1151 .wait()
1152 .expect("KompactSystem failed to shut down!");
1153 println!("System shut down.");
1154 let mut cfg2 = kompact::test_support::test_kompact_config();
1155 println!("Configuring network");
1156 cfg2.system_components(DeadletterBox::new, {
1157 let net_config =
1158 NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
1159 net_config.build()
1160 });
1161 println!("Starting 2nd KompactSystem");
1162 let system2 = cfg2.build().wait().expect("KompactSystem");
1163 thread::sleep(Duration::from_millis(100));
1164 println!("2nd KompactSystem started just fine.");
1165 let named_path2 = ActorPath::Named(NamedPath::with_system(
1166 system2.system_path(),
1167 vec!["test".into()],
1168 ));
1169 println!("Got path: {}", named_path);
1170 assert_eq!(named_path, named_path2);
1171 system2
1172 .shutdown()
1173 .wait()
1174 .expect("2nd KompactSystem failed to shut down!");
1175 }
1176
1177 #[test]
1180 fn network_cleanup_with_timeout() {
1181 let mut cfg = kompact::test_support::test_kompact_config();
1182 println!("Configuring network");
1183 cfg.system_components(DeadletterBox::new, {
1184 let net_config =
1185 NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
1186 net_config.build()
1187 });
1188 println!("Starting KompactSystem");
1189 let system = cfg.build().wait().expect("KompactSystem");
1190 println!("KompactSystem started just fine.");
1191 let named_path = ActorPath::Named(NamedPath::with_system(
1192 system.system_path(),
1193 vec!["test".into()],
1194 ));
1195 println!("Got path: {}", named_path);
1196 let port = system.system_path().port();
1197 println!("Got port: {}", port);
1198
1199 thread::Builder::new()
1200 .name("System1 Killer".to_string())
1201 .spawn(move || {
1202 thread::sleep(Duration::from_millis(100));
1203 println!("Shutting down first system...");
1204 system
1205 .shutdown()
1206 .wait()
1207 .expect("KompactSystem failed to shut down!");
1208 println!("System shut down.");
1209 })
1210 .ok();
1211
1212 let mut cfg2 = kompact::test_support::test_kompact_config();
1213 println!("Configuring network");
1214 cfg2.system_components(DeadletterBox::new, {
1215 let net_config =
1216 NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
1217 net_config.build()
1218 });
1219 println!("Starting 2nd KompactSystem");
1220 let system2 = cfg2.build().wait().expect("KompactSystem");
1221 thread::sleep(Duration::from_millis(100));
1222 println!("2nd KompactSystem started just fine.");
1223 let named_path2 = ActorPath::Named(NamedPath::with_system(
1224 system2.system_path(),
1225 vec!["test".into()],
1226 ));
1227 println!("Got path: {}", named_path);
1228 assert_eq!(named_path, named_path2);
1229 system2
1230 .shutdown()
1231 .wait()
1232 .expect("2nd KompactSystem failed to shut down!");
1233 }
1234
1235 #[test]
1236 fn test_system_path_timing() {
1237 let mut cfg = kompact::test_support::test_kompact_config();
1238 println!("Configuring network");
1239 cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
1240 println!("Starting KompactSystem");
1241 let system = cfg.build().wait().expect("KompactSystem");
1242 println!("KompactSystem started just fine.");
1243 let named_path = ActorPath::Named(NamedPath::with_system(
1244 system.system_path(),
1245 vec!["test".into()],
1246 ));
1247 println!("Got path: {}", named_path);
1248 }
1250
1251 #[test]
1252 fn cleanup_bufferchunks_from_dead_actors() {
1256 let system1 = || {
1257 let mut cfg = kompact::test_support::test_kompact_config();
1258 cfg.system_components(
1259 DeadletterBox::new,
1260 NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")).build(),
1261 );
1262 cfg.build().wait().expect("KompactSystem")
1263 };
1264 let system2 = |port| {
1265 let mut cfg = kompact::test_support::test_kompact_config();
1266 cfg.system_components(
1267 DeadletterBox::new,
1268 NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port)).build(),
1269 );
1270 cfg.build().wait().expect("KompactSystem")
1271 };
1272
1273 let system2a = system2(0);
1275 let port = system2a.system_path().port();
1276 let (ponger_named, ponf) = system2a.create_and_register(PongerAct::new_lazy);
1278 let poaf = system2a.register_by_alias(&ponger_named, "custom_name");
1279 ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1280 poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1281 let named_path = ActorPath::Named(NamedPath::with_system(
1282 system2a.system_path(),
1283 vec!["custom_name".into()],
1284 ));
1285 let named_path_clone = named_path;
1286 let system1: KompactSystem = system1();
1288 let (pinger_named, pinf) =
1289 system1.create_and_register(move || PingerAct::new_eager(named_path_clone));
1290 pinf.wait_expect(Duration::from_millis(1000), "Pinger failed to register!");
1291
1292 system2a.shutdown().wait().ok();
1294 system1.start(&pinger_named);
1296 thread::sleep(Duration::from_millis(100));
1298 system1.kill(pinger_named);
1300
1301 thread::sleep(Duration::from_millis(5000));
1303
1304 let mut garbage_len = 0;
1306 system1.with_dispatcher_definition(|dispatcher| {
1307 let dispatcher = dispatcher
1308 .downcast_mut::<NetworkDispatcher>()
1309 .expect("expected kompact-net NetworkDispatcher");
1310 garbage_len = dispatcher.garbage_buffers.len();
1311 });
1312 assert_ne!(0, garbage_len);
1313
1314 println!("Setting up system2b");
1316 let system2b = system2(port);
1317 let (ponger_named, ponf) = system2b.create_and_register(PongerAct::new_lazy);
1318 let poaf = system2b.register_by_alias(&ponger_named, "custom_name");
1319 ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1320 poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1321 println!("Starting actor on system2b");
1322 system2b.start(&ponger_named);
1323
1324 thread::sleep(Duration::from_millis(10000));
1327
1328 system1.with_dispatcher_definition(|dispatcher| {
1330 let dispatcher = dispatcher
1331 .downcast_mut::<NetworkDispatcher>()
1332 .expect("expected kompact-net NetworkDispatcher");
1333 garbage_len = dispatcher.garbage_buffers.len();
1334 });
1335 assert_eq!(0, garbage_len);
1336
1337 system1
1338 .shutdown()
1339 .wait()
1340 .expect("Kompact didn't shut down properly");
1341 system2b
1342 .shutdown()
1343 .wait()
1344 .expect("Kompact didn't shut down properly");
1345 }
1346}