Skip to main content

kompact_net/
dispatcher.rs

1use super::*;
2use crate::{
3    actors::{
4        Actor,
5        ActorPath,
6        Dispatcher,
7        DynActorRef,
8        NamedPath,
9        SystemPath,
10        Transport,
11        Transport::Tcp,
12    },
13    component::{Component, ComponentContext, ExecuteResult},
14    events::{NetworkDispatcherEvent, NetworkStatus, NetworkStatusPort, NetworkStatusRequest},
15    lookup::{ActorLookup, ActorStore, InsertResult, LookupResult},
16    messaging::{
17        ActorRegistration,
18        DispatchData,
19        DispatchEnvelope,
20        DispatchEvent,
21        MsgEnvelope,
22        NetMessage,
23        PathResolvable,
24        PolicyRegistration,
25        RegistrationEnvelope,
26        RegistrationError,
27        RegistrationEvent,
28        RegistrationPromise,
29    },
30    net::{ConnectionState, NetworkBridgeError, Protocol, SessionId, SocketAddr, buffers::*},
31    queue_manager::QueueManager,
32};
33use arc_swap::ArcSwap;
34use kompact::let_irrefutable;
35use rustc_hash::FxHashMap;
36use std::{collections::VecDeque, sync::Arc, time::Duration};
37
38// Default values for network config.
39mod defaults {
40    pub(crate) const RETRY_CONNECTIONS_INTERVAL: u64 = 5000;
41    pub(crate) const BOOT_TIMEOUT: u64 = 5000;
42    pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 10;
43    pub(crate) const SOFT_CONNECTION_LIMIT: u32 = 1000;
44    pub(crate) const HARD_CONNECTION_LIMIT: u32 = 1100;
45}
46
47type NetHashMap<K, V> = FxHashMap<K, V>;
48
49/// Configuration builder for the network dispatcher
50///
51/// # Example
52///
53/// This example binds to local host on a free port chosen by the operating system.
54///
55/// ```
56/// use kompact::prelude::*;
57/// use kompact_net::NetworkConfig;
58///
59/// let mut conf = KompactConfig::default();
60/// conf.system_components(DeadletterBox::new, NetworkConfig::default().build());
61/// let system = conf.build().wait().expect("system");
62/// # system.shutdown().wait().expect("shutdown");
63/// ```
64#[derive(Clone, Debug)]
65pub struct NetworkConfig {
66    addr: SocketAddr,
67    transport: Transport,
68    buffer_config: BufferConfig,
69    custom_allocator: Option<Arc<dyn ChunkAllocator>>,
70    tcp_nodelay: bool,
71    max_connection_retry_attempts: u8,
72    connection_retry_interval: u64,
73    boot_timeout: u64,
74    soft_connection_limit: u32,
75    hard_connection_limit: u32,
76}
77
78impl NetworkConfig {
79    /// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
80    /// NetworkDispatcher and NetworkThread will use the default `BufferConfig`
81    pub fn new(addr: SocketAddr) -> Self {
82        NetworkConfig {
83            addr,
84            transport: Transport::Tcp,
85            buffer_config: BufferConfig::default(),
86            custom_allocator: None,
87            tcp_nodelay: true,
88            max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
89            connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
90            boot_timeout: defaults::BOOT_TIMEOUT,
91            soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
92            hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
93        }
94    }
95
96    /// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
97    /// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors
98    pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
99        buffer_config.validate();
100        let mut cfg = NetworkConfig::new(addr);
101        cfg.set_buffer_config(buffer_config);
102        cfg
103    }
104
105    /// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
106    /// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors
107    pub fn with_custom_allocator(
108        addr: SocketAddr,
109        buffer_config: BufferConfig,
110        custom_allocator: Arc<dyn ChunkAllocator>,
111    ) -> Self {
112        buffer_config.validate();
113        NetworkConfig {
114            addr,
115            transport: Transport::Tcp,
116            buffer_config,
117            custom_allocator: Some(custom_allocator),
118            tcp_nodelay: true,
119            max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
120            connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
121            boot_timeout: defaults::BOOT_TIMEOUT,
122            soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
123            hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
124        }
125    }
126
127    /// Replace the current socket address with `addr`.
128    pub fn with_socket(mut self, addr: SocketAddr) -> Self {
129        self.addr = addr;
130        self
131    }
132
133    /// Complete the configuration and provide a function that produces a network dispatcher
134    ///
135    /// Returns the appropriate function type for use
136    /// with [system_components](KompactConfig::system_components).
137    pub fn build(self) -> impl Fn(KPromise<()>) -> NetworkDispatcher {
138        move |notify_ready| NetworkDispatcher::with_config(self.clone(), notify_ready)
139    }
140
141    /// Returns a pointer to the configurations [BufferConfig](net::buffers::BufferConfig).
142    pub fn get_buffer_config(&self) -> &BufferConfig {
143        &self.buffer_config
144    }
145
146    /// Sets the configurations [BufferConfig](net::buffers::BufferConfig) to `buffer_config`
147    pub fn set_buffer_config(&mut self, buffer_config: BufferConfig) {
148        self.buffer_config = buffer_config;
149    }
150
151    /// Returns a pointer to the `CustomAllocator` option so that it can be cloned by the caller.
152    pub fn get_custom_allocator(&self) -> &Option<Arc<dyn ChunkAllocator>> {
153        &self.custom_allocator
154    }
155
156    /// Reads the `tcp_nodelay` parameter of the [NetworkConfig](NetworkConfig).
157    pub fn get_tcp_nodelay(&self) -> bool {
158        self.tcp_nodelay
159    }
160
161    /// If set to `true` the Nagle algorithm will be turned off for all Tcp Network-channels.
162    ///
163    /// Decreases network-latency at the cost of reduced throughput and increased congestion.
164    ///
165    /// Default value is `false`, i.e. the Nagle algorithm is turned on by default.
166    pub fn set_tcp_nodelay(&mut self, nodelay: bool) {
167        self.tcp_nodelay = nodelay;
168    }
169
170    /// Configures how many attempts at re-establishing a connection will be made before giving up
171    /// and discarding the enqueued outgoing messages.
172    ///
173    /// Default value is 10 times.
174    pub fn set_max_connection_retry_attempts(&mut self, count: u8) {
175        self.max_connection_retry_attempts = count;
176    }
177
178    /// Returns the number of times the system will retry before giving up on a connection.
179    pub fn get_max_connection_retry_attempts(&self) -> u8 {
180        self.max_connection_retry_attempts
181    }
182
183    /// Configures how long to wait (in ms) between attempts at establishing a connection.
184    ///
185    /// Default value is 5000 ms.
186    pub fn set_connection_retry_interval(&mut self, milliseconds: u64) {
187        self.connection_retry_interval = milliseconds;
188    }
189
190    /// How long (in ms) the system will wait between attempts at re-establishing connection.
191    pub fn get_connection_retry_interval(&self) -> u64 {
192        self.connection_retry_interval
193    }
194
195    /// Configures how long the system will wait (in ms) for the network layer to set-up
196    ///
197    /// Default value is 5000 ms.
198    pub fn set_boot_timeout(&mut self, milliseconds: u64) {
199        self.boot_timeout = milliseconds;
200    }
201
202    /// How long (in ms) the system will wait (in ms) for the network layer to set-up
203    pub fn get_boot_timeout(&self) -> u64 {
204        self.boot_timeout
205    }
206
207    /// Configures how many concurrent Network-connections may be active at any point in time
208    ///
209    /// When the limit is exceeded the system will *gracefully close* the least recently used channel.
210    ///
211    /// Default value is 1000 Connections.
212    pub fn set_soft_connection_limit(&mut self, limit: u32) {
213        self.soft_connection_limit = limit;
214    }
215
216    /// How many Active Network-connections the system will allow before it starts closing least recently used
217    pub fn get_soft_connection_limit(&self) -> u32 {
218        self.soft_connection_limit
219    }
220
221    /// Configures how many concurrent Network-connections the system may have at any point.
222    ///
223    /// When the limit is exceeded the system will reject all incoming and outgoing requests for new
224    /// connections.
225    ///
226    /// Default value is 1100 Connections.
227    pub fn set_hard_connection_limit(&mut self, limit: u32) {
228        self.hard_connection_limit = limit;
229    }
230
231    /// How many Network-connections the system will allow before it starts closing least recently used
232    pub fn get_hard_connection_limit(&self) -> u32 {
233        self.hard_connection_limit
234    }
235}
236
237/// Socket defaults to `127.0.0.1:0` (i.e. a random local port) and protocol is [TCP](Transport::Tcp)
238impl Default for NetworkConfig {
239    fn default() -> Self {
240        NetworkConfig {
241            addr: "127.0.0.1:0".parse().unwrap(),
242            transport: Transport::Tcp,
243            buffer_config: BufferConfig::default(),
244            custom_allocator: None,
245            tcp_nodelay: true,
246            max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
247            connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
248            boot_timeout: defaults::BOOT_TIMEOUT,
249            soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
250            hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
251        }
252    }
253}
254
255/// A network-capable dispatcher for sending messages to remote actors
256///
257/// Construct this using [NetworkConfig](NetworkConfig::build).
258///
259/// This dispatcher automatically creates channels to requested target
260/// systems on demand and maintains them while in use.
261///
262/// The current implementation only supports [TCP](Transport::Tcp) as
263/// a transport protocol.
264///
265/// If possible, this implementation will "reflect" messages
266/// to local actors directly back up, instead of serialising them first.
267#[derive(ComponentDefinition)]
268pub struct NetworkDispatcher {
269    ctx: ComponentContext<NetworkDispatcher>,
270    /// Local map of connection statuses
271    connections: NetHashMap<SocketAddr, ConnectionState>,
272    /// Network configuration for this dispatcher
273    cfg: NetworkConfig,
274    /// Shared lookup structure for mapping [actor paths](ActorPath) and [actor refs](ActorRef)
275    lookup: Arc<ArcSwap<ActorStore>>,
276    // Fields initialized at [Start](ControlEvent::Start) – they require ComponentContextual awareness
277    /// Bridge into asynchronous networking layer
278    net_bridge: Option<net::Bridge>,
279    /// A cached version of the bound system path
280    system_path: Option<SystemPath>,
281    /// Management for queuing Frames during network unavailability (conn. init. and MPSC unreadiness)
282    queue_manager: QueueManager,
283    /// Reaper which cleans up deregistered actor references in the actor lookup table
284    reaper: lookup::gc::ActorRefReaper,
285    notify_ready: Option<KPromise<()>>,
286    /// Stores the number of retry-attempts for connections. Checked and incremented periodically by the reaper.
287    retry_map: FxHashMap<SocketAddr, u8>,
288    garbage_buffers: VecDeque<BufferChunk>,
289    /// The dispatcher emits NetworkStatusUpdates to the `NetworkStatusPort`.
290    network_status_port: ProvidedPort<NetworkStatusPort>,
291}
292
293impl NetworkDispatcher {
294    /// Create a new dispatcher with the default configuration
295    ///
296    /// See also [NetworkConfig](NetworkConfig).
297    ///
298    /// # Example
299    ///
300    /// This example binds to local host on a free port chosen by the operating system.
301    ///
302    /// ```
303    /// use kompact::prelude::*;
304    /// use kompact_net::NetworkDispatcher;
305    ///
306    /// let mut conf = KompactConfig::default();
307    /// conf.system_components(DeadletterBox::new, NetworkDispatcher::new);
308    /// let system = conf.build().wait().expect("system");
309    /// # system.shutdown().wait().expect("shutdown");
310    /// ```
311    pub fn new(notify_ready: KPromise<()>) -> Self {
312        let config = NetworkConfig::default();
313        NetworkDispatcher::with_config(config, notify_ready)
314    }
315
316    /// Create a new dispatcher with the given configuration
317    ///
318    /// For better readability in combination with [system_components](KompactConfig::system_components),
319    /// use [NetworkConfig::build](NetworkConfig::build) instead.
320    pub fn with_config(cfg: NetworkConfig, notify_ready: KPromise<()>) -> Self {
321        let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new()));
322        // Just a temporary assignment...will be replaced from config on start
323        let reaper = lookup::gc::ActorRefReaper::default();
324
325        NetworkDispatcher {
326            ctx: ComponentContext::uninitialised(),
327            connections: Default::default(),
328            cfg,
329            lookup,
330            net_bridge: None,
331            system_path: None,
332            queue_manager: QueueManager::new(),
333            reaper,
334            notify_ready: Some(notify_ready),
335            garbage_buffers: VecDeque::new(),
336            retry_map: Default::default(),
337            network_status_port: ProvidedPort::uninitialised(),
338        }
339    }
340
341    /// Return a reference to the cached system path
342    ///
343    /// Mutable, since it will update the cached value, if necessary.
344    pub fn system_path_ref(&mut self) -> &SystemPath {
345        if self.system_path.is_none() {
346            let _ = self.system_path(); // fill the cache
347        }
348        self.system_path
349            .as_ref()
350            .expect("Cached value should have been filled by calling self.system_path()!")
351    }
352
353    fn start(&mut self) {
354        debug!(self.ctx.log(), "Starting self and network bridge");
355        self.reaper = lookup::gc::ActorRefReaper::from_config(self.ctx.config());
356        self.start_bridge(self.cfg.addr);
357
358        let deadletter: DynActorRef = self.ctx.system().deadletter_ref().dyn_ref();
359        self.lookup.rcu(|current| {
360            let mut next = ActorStore::clone(current);
361            next.insert(PathResolvable::System, deadletter.clone())
362                .expect("Deadletter shouldn't error");
363            next
364        });
365
366        self.schedule_retries();
367    }
368
369    fn start_bridge(&mut self, address: SocketAddr) {
370        let dispatcher = self
371            .actor_ref()
372            .hold()
373            .expect("Self can hardly be deallocated!");
374        let bridge_logger = self.ctx.log().new(o!("owner" => "Bridge"));
375        let network_thread_logger = self.ctx.log().new(o!("owner" => "NetworkThread"));
376        let (mut bridge, _addr) = net::Bridge::new(
377            self.lookup.clone(),
378            network_thread_logger,
379            bridge_logger,
380            address,
381            dispatcher.clone(),
382            &self.cfg,
383        );
384        bridge.set_dispatcher(dispatcher);
385        self.net_bridge = Some(bridge);
386    }
387
388    fn handle_network_failure(&mut self) {
389        self.network_status_port
390            .trigger(NetworkStatus::CriticalNetworkFailure);
391        let faulty_bridge = self.net_bridge.take();
392        let connections: Vec<(SocketAddr, ConnectionState)> = self.connections.drain().collect();
393        for (address, state) in connections {
394            if let ConnectionState::Connected(id) = state {
395                self.connection_lost(SystemPath::with_socket(Transport::Tcp, address), id);
396            } else {
397                self.connections.insert(address, state);
398            }
399        }
400        // Start a new bridge, try to use the same bound IP as the old one was bound to
401        let bound_address = faulty_bridge
402            .map(|b| b.local_addr().unwrap_or(self.cfg.addr))
403            .unwrap_or(self.cfg.addr);
404        self.start_bridge(bound_address);
405    }
406
407    fn stop(&mut self) {
408        if let Some(Err(e)) = self.net_bridge.take().map(|bridge| bridge.stop()) {
409            error!(
410                self.ctx().log(),
411                "NetworkBridge did not shut down as expected! Error was:\n     {:?}\n", e
412            );
413        }
414    }
415
416    fn kill(&mut self) {
417        if let Some(Err(e)) = self.net_bridge.take().map(|bridge| bridge.kill()) {
418            error!(
419                self.ctx().log(),
420                "NetworkBridge did not shut down as expected! Error was:\n     {:?}\n", e
421            );
422        }
423    }
424
425    fn schedule_reaper(&mut self) {
426        if !self.reaper.is_scheduled() {
427            // First time running; mark as scheduled and jump straight to scheduling
428            self.reaper.schedule();
429        } else {
430            // Repeated schedule; prune deallocated ActorRefs and update strategy accordingly
431            let num_reaped = self.reaper.run(&self.lookup);
432            if num_reaped == 0 {
433                // No work done; slow down interval
434                self.reaper.strategy_mut().incr();
435            } else {
436                self.reaper.strategy_mut().decr();
437            }
438        }
439        let next_wakeup = self.reaper.strategy().curr();
440        debug!(
441            self.ctx().log(),
442            "Scheduling reaping at {:?}ms", next_wakeup
443        );
444
445        let mut retry_queue = VecDeque::new();
446        for mut trash in self.garbage_buffers.drain(..) {
447            if !trash.free() {
448                retry_queue.push_back(trash);
449            }
450        }
451        // info!(self.ctx().log(), "tried to clean {} buffer(s)", retry_queue.len()); // manual verification in testing
452        self.garbage_buffers.append(&mut retry_queue);
453
454        self.schedule_once(Duration::from_millis(next_wakeup), move |target, _id| {
455            target.schedule_reaper();
456            Handled::OK
457        });
458    }
459
460    fn schedule_retries(&mut self) {
461        // First check the retry_map if we should re-request connections
462        let drain = self.retry_map.clone();
463        self.retry_map.clear();
464        for (addr, retry) in drain {
465            if retry < self.cfg.max_connection_retry_attempts {
466                // Make sure we will re-request connection later
467                self.retry_map.insert(addr, retry + 1);
468                if let Some(bridge) = &self.net_bridge {
469                    // Do connection attempt
470                    debug!(
471                        self.ctx().log(),
472                        "Dispatcher retrying connection to host {}, attempt {}/{}",
473                        addr,
474                        retry,
475                        self.cfg.max_connection_retry_attempts
476                    );
477                    bridge.connect(Transport::Tcp, addr).unwrap();
478                }
479            } else {
480                // Too many retries, give up on the connection.
481                info!(
482                    self.ctx().log(),
483                    "Dispatcher giving up on remote host {}, dropping queues", addr
484                );
485                self.queue_manager.drop_queue(&addr);
486                self.connections.remove(&addr);
487                self.network_status_port
488                    .trigger(NetworkStatus::ConnectionDropped(SystemPath::with_socket(
489                        Transport::Tcp,
490                        addr,
491                    )));
492            }
493        }
494        self.schedule_once(
495            Duration::from_millis(self.cfg.connection_retry_interval),
496            move |target, _id| {
497                target.schedule_retries();
498                Handled::OK
499            },
500        );
501    }
502
503    fn on_event(&mut self, ev: Box<dyn DispatchEvent>) {
504        let ev = match ev.into_any().downcast::<NetworkDispatcherEvent>() {
505            Ok(ev) => *ev,
506            Err(_) => {
507                warn!(
508                    self.ctx.log(),
509                    "Ignoring unexpected dispatcher event in NetworkDispatcher",
510                );
511                return;
512            }
513        };
514        match ev {
515            NetworkDispatcherEvent::Network(ev) => match ev {
516                NetworkStatus::ConnectionEstablished(system_path, session) => {
517                    self.connection_established(system_path, session)
518                }
519                NetworkStatus::ConnectionLost(system_path, session) => {
520                    self.connection_lost(system_path, session)
521                }
522                NetworkStatus::ConnectionClosed(system_path, session) => {
523                    self.connection_closed(system_path, session)
524                }
525                NetworkStatus::ConnectionDropped(system_path) => {
526                    let _ = self.retry_map.remove(&system_path.socket_address());
527                    self.network_status_port
528                        .trigger(NetworkStatus::ConnectionDropped(system_path));
529                }
530                NetworkStatus::BlockedSystem(system_path) => {
531                    self.connections
532                        .insert(system_path.socket_address(), ConnectionState::Blocked);
533                    self.network_status_port
534                        .trigger(NetworkStatus::BlockedSystem(system_path));
535                }
536                NetworkStatus::BlockedIp(ip_addr) => {
537                    self.network_status_port
538                        .trigger(NetworkStatus::BlockedIp(ip_addr));
539                }
540                NetworkStatus::BlockedIpNet(ip_net) => {
541                    self.network_status_port
542                        .trigger(NetworkStatus::BlockedIpNet(ip_net));
543                }
544                NetworkStatus::AllowedSystem(system_path) => {
545                    self.connections.remove(&system_path.socket_address());
546                    self.network_status_port
547                        .trigger(NetworkStatus::AllowedSystem(system_path));
548                }
549                NetworkStatus::AllowedIp(ip_addr) => {
550                    self.network_status_port
551                        .trigger(NetworkStatus::AllowedIp(ip_addr));
552                }
553                NetworkStatus::AllowedIpNet(ip_net) => {
554                    self.network_status_port
555                        .trigger(NetworkStatus::AllowedIpNet(ip_net));
556                }
557                NetworkStatus::SoftConnectionLimitExceeded => self
558                    .network_status_port
559                    .trigger(NetworkStatus::SoftConnectionLimitExceeded),
560                NetworkStatus::HardConnectionLimitReached => self
561                    .network_status_port
562                    .trigger(NetworkStatus::HardConnectionLimitReached),
563                NetworkStatus::CriticalNetworkFailure => self.handle_network_failure(),
564            },
565            NetworkDispatcherEvent::RejectedData((addr, data)) => {
566                // These are messages which we routed to a network-thread before they lost the connection.
567                self.queue_manager.enqueue_priority_data(*data, addr);
568                self.retry_map.entry(addr).or_insert(0);
569            }
570        }
571    }
572
573    fn connection_established(&mut self, system_path: SystemPath, id: SessionId) {
574        info!(
575            self.ctx().log(),
576            "registering newly connected conn at {:?}", system_path
577        );
578        let addr = &system_path.socket_address();
579        self.network_status_port
580            .trigger(NetworkStatus::ConnectionEstablished(system_path, id));
581        let _ = self.retry_map.remove(addr);
582        if self.queue_manager.has_data(addr) {
583            // Drain as much as possible
584            while let Some(frame) = self.queue_manager.pop_data(addr) {
585                if let Some(bridge) = &self.net_bridge {
586                    //println!("Sending queued frame to newly established connection");
587                    if let Err(e) = bridge.route(*addr, frame, net::Protocol::Tcp) {
588                        error!(self.ctx.log(), "Bridge error while routing {:?}", e);
589                    }
590                }
591            }
592        }
593        self.connections
594            .insert(*addr, ConnectionState::Connected(id));
595    }
596
597    fn connection_closed(&mut self, system_path: SystemPath, id: SessionId) {
598        let addr = &system_path.socket_address();
599        self.network_status_port
600            .trigger(NetworkStatus::ConnectionClosed(system_path, id));
601        // Ack the closing
602        if let Some(Err(e)) = self
603            .net_bridge
604            .as_ref()
605            .map(|bridge| bridge.ack_closed(*addr))
606        {
607            error!(
608                self.ctx.log(),
609                "Bridge error while acking closed connection {:?}", e
610            );
611        }
612        self.connections.insert(*addr, ConnectionState::Closed(id));
613        if self.queue_manager.has_data(addr) {
614            self.retry_map.insert(*addr, 0);
615        }
616    }
617
618    fn connection_lost(&mut self, system_path: SystemPath, id: SessionId) {
619        let addr = &system_path.socket_address();
620        if !self.retry_map.contains_key(addr) {
621            warn!(self.ctx().log(), "connection lost to {:?}", addr);
622            self.retry_map.insert(*addr, 0); // Make sure we try to re-establish the connection
623        }
624        self.network_status_port
625            .trigger(NetworkStatus::ConnectionLost(system_path, id));
626        if let Some(Err(e)) = self
627            .net_bridge
628            .as_ref()
629            .map(|bridge| bridge.ack_closed(*addr))
630        {
631            error!(
632                self.ctx.log(),
633                "Bridge error while acking lost connection {:?}", e
634            );
635        }
636        self.connections.insert(*addr, ConnectionState::Lost(id));
637    }
638
639    /// Forwards `msg` up to a local `dst` actor, if it exists.
640    fn route_local(&mut self, dst: ActorPath, msg: DispatchData) {
641        let lookup = self.lookup.load();
642        let lookup_result = lookup.get_by_actor_path(&dst);
643        match msg.into_local() {
644            Ok(netmsg) => match lookup_result {
645                LookupResult::Ref(actor) => {
646                    actor.tell(netmsg);
647                }
648                LookupResult::Group(group) => {
649                    group.route(netmsg, self.log());
650                }
651                LookupResult::None => {
652                    error!(
653                        self.ctx.log(),
654                        "No local actor found at {:?}. Forwarding to DeadletterBox",
655                        netmsg.receiver,
656                    );
657                    self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
658                }
659                LookupResult::Err(e) => {
660                    error!(
661                        self.ctx.log(),
662                        "An error occurred during local actor lookup at {:?}. Forwarding to DeadletterBox. The error was: {}",
663                        netmsg.receiver,
664                        e
665                    );
666                    self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
667                }
668            },
669            Err(e) => {
670                error!(self.log(), "Could not serialise msg: {:?}. Dropping...", e);
671            }
672        }
673    }
674
675    fn route_remote_udp(
676        &mut self,
677        addr: SocketAddr,
678        data: DispatchData,
679    ) -> Result<(), NetworkBridgeError> {
680        if let Some(bridge) = &self.net_bridge {
681            bridge.route(addr, data, net::Protocol::Udp)?;
682        } else {
683            warn!(
684                self.ctx.log(),
685                "Dropping UDP message to {}, as bridge is not connected.", addr
686            );
687        }
688        Ok(())
689    }
690
691    fn route_remote_tcp(
692        &mut self,
693        addr: SocketAddr,
694        data: DispatchData,
695    ) -> Result<(), NetworkBridgeError> {
696        let state: &mut ConnectionState =
697            self.connections.entry(addr).or_insert(ConnectionState::New);
698        let next: Option<ConnectionState> = match *state {
699            ConnectionState::New => {
700                debug!(
701                    self.ctx.log(),
702                    "No connection found; establishing and queuing frame"
703                );
704                self.queue_manager.enqueue_data(data, addr);
705
706                if let Some(ref mut bridge) = self.net_bridge {
707                    debug!(self.ctx.log(), "Establishing new connection to {:?}", addr);
708                    self.retry_map.insert(addr, 0); // Make sure we will re-request connection later
709                    bridge.connect(Transport::Tcp, addr).unwrap();
710                    Some(ConnectionState::Initializing)
711                } else {
712                    error!(self.ctx.log(), "No network bridge found; dropping message");
713                    None
714                }
715            }
716            ConnectionState::Connected(_) => {
717                if self.queue_manager.has_data(&addr) {
718                    self.queue_manager.enqueue_data(data, addr);
719
720                    if let Some(bridge) = &self.net_bridge {
721                        while let Some(queued_data) = self.queue_manager.pop_data(&addr) {
722                            bridge.route(addr, queued_data, net::Protocol::Tcp)?;
723                        }
724                    }
725                    None
726                } else {
727                    // Send frame
728                    if let Some(bridge) = &self.net_bridge {
729                        bridge.route(addr, data, net::Protocol::Tcp)?;
730                    }
731                    None
732                }
733            }
734            ConnectionState::Initializing => {
735                self.queue_manager.enqueue_data(data, addr);
736                None
737            }
738            ConnectionState::Closed(_) => {
739                self.queue_manager.enqueue_data(data, addr);
740                if let Some(bridge) = &self.net_bridge {
741                    self.retry_map.entry(addr).or_insert(0);
742                    bridge.connect(Tcp, addr)?;
743                }
744                Some(ConnectionState::Initializing)
745            }
746            ConnectionState::Lost(_) => {
747                // May be recovered...
748                self.queue_manager.enqueue_data(data, addr);
749                None
750            }
751            ConnectionState::Blocked => {
752                warn!(
753                    self.ctx.log(),
754                    "Tried sending a message to a blocked connection: {:?}. Dropping message.",
755                    addr
756                );
757                None
758            }
759        };
760
761        if let Some(next) = next {
762            *state = next;
763        }
764        Ok(())
765    }
766
767    fn resolve_path(&mut self, resolvable: &PathResolvable) -> Result<ActorPath, PathParseError> {
768        match resolvable {
769            PathResolvable::Path(actor_path) => Ok(actor_path.clone()),
770            PathResolvable::Alias(alias) => self
771                .system_path()
772                .into_named_with_string(alias)
773                .map(|p| p.into()),
774            PathResolvable::Segments(segments) => self
775                .system_path()
776                .into_named_with_vec(segments.to_vec())
777                .map(|p| p.into()),
778            PathResolvable::ActorId(id) => Ok(self.system_path().into_unique(*id).into()),
779            PathResolvable::System => Ok(self.deadletter_path()),
780        }
781    }
782
783    /// Forwards `msg` to destination described by `dst`, routing it across the network
784    /// if needed.
785    fn route(&mut self, dst: ActorPath, msg: DispatchData) -> Result<(), NetworkBridgeError> {
786        if self.system_path_ref() == dst.system() {
787            self.route_local(dst, msg);
788            Ok(())
789        } else {
790            let proto = dst.system().protocol();
791            match proto {
792                Transport::Local => {
793                    self.route_local(dst, msg);
794                    Ok(())
795                }
796                Transport::Tcp => {
797                    let addr = SocketAddr::new(*dst.address(), dst.port());
798                    self.route_remote_tcp(addr, msg)
799                }
800                Transport::Udp => {
801                    let addr = SocketAddr::new(*dst.address(), dst.port());
802                    self.route_remote_udp(addr, msg)
803                }
804            }
805        }
806    }
807
808    fn deadletter_path(&mut self) -> ActorPath {
809        ActorPath::Named(NamedPath::with_system(self.system_path(), Vec::new()))
810    }
811
812    fn register_actor(
813        &mut self,
814        registration: ActorRegistration,
815        update: bool,
816        promise: RegistrationPromise,
817    ) {
818        let ActorRegistration { actor, path } = registration;
819        let res = self
820            .resolve_path(&path)
821            .map_err(RegistrationError::InvalidPath)
822            .and_then(|ap| {
823                let lease = self.lookup.load();
824                if lease.contains(&path) && !update {
825                    warn!(
826                        self.ctx.log(),
827                        "Detected duplicate path during registration. The path will not be re-registered"
828                    );
829                    drop(lease);
830                    Err(RegistrationError::DuplicateEntry)
831                } else {
832                    drop(lease);
833                    let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
834                    self.lookup.rcu(|current| {
835                        let mut next = ActorStore::clone(current);
836                        result = next.insert(path.clone(), actor.clone());
837                        next
838                    });
839                    if let Ok(ref res) = result
840                        && !res.is_empty()
841                    {
842                        info!(self.ctx.log(), "Replaced entry for path={:?}", path);
843                    }
844                    result.map(|_| ap)
845                        .map_err(RegistrationError::InvalidPath)
846                }
847            });
848        if res.is_ok() && !self.reaper.is_scheduled() {
849            self.schedule_reaper();
850        }
851        debug!(self.log(), "Completed actor registration with {:?}", res);
852        match promise {
853            RegistrationPromise::Fulfil(promise) => {
854                promise.fulfil(res).unwrap_or_else(|e| {
855                    error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
856                });
857            }
858            RegistrationPromise::None => (), // ignore
859        }
860    }
861
862    fn register_policy(
863        &mut self,
864        registration: PolicyRegistration,
865        update: bool,
866        promise: RegistrationPromise,
867    ) {
868        let PolicyRegistration { policy, path } = registration;
869        let lease = self.lookup.load();
870        let path_res = PathResolvable::Segments(path);
871        let res = self
872            .resolve_path(&path_res)
873            .map_err(RegistrationError::InvalidPath)
874            .and_then(|ap| {
875                if lease.contains(&path_res) && !update {
876                    warn!(
877                        self.ctx.log(),
878                        "Detected duplicate path during registration. The path will not be re-registered",
879                    );
880                    drop(lease);
881                    Err(RegistrationError::DuplicateEntry)
882                } else {
883                    drop(lease);
884                    //let PathResolvable::Segments(path) = path_res;
885                    // This should work, we just assigned it above, 
886                    // but the Rust compiler can't figure that out
887                    let_irrefutable!(path, PathResolvable::Segments(path) = path_res);
888                    let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
889                    self.lookup.rcu(|current| {
890                        let mut next = ActorStore::clone(current);
891                        result = next.set_routing_policy(&path, policy.clone());
892                        next
893                    });
894                    if let Ok(ref res) = result
895                        && !res.is_empty()
896                    {
897                        info!(self.ctx.log(), "Replaced entry for path={:?}", path);
898                    }
899                    result.map(|_| ap).map_err(RegistrationError::InvalidPath)
900                }
901            });
902        debug!(self.log(), "Completed policy registration with {:?}", res);
903        match promise {
904            RegistrationPromise::Fulfil(promise) => {
905                promise.fulfil(res).unwrap_or_else(|e| {
906                    error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
907                });
908            }
909            RegistrationPromise::None => (), // ignore
910        }
911    }
912
913    fn close_channel(&mut self, addr: SocketAddr) {
914        if let Some(state) = self.connections.get_mut(&addr) {
915            match state {
916                ConnectionState::Connected(session) => {
917                    trace!(
918                        self.ctx.log(),
919                        "Closing channel to connected system {}, session {:?}", addr, session
920                    );
921                    if let Some(bridge) = &self.net_bridge {
922                        while self.queue_manager.has_data(&addr) {
923                            if let Some(Err(e)) = self
924                                .queue_manager
925                                .pop_data(&addr)
926                                .map(|data| bridge.route(addr, data, Protocol::Tcp))
927                            {
928                                error!(self.ctx.log(), "Bridge error while routing {:?}", e);
929                            }
930                        }
931                        if let Err(e) = bridge.close_channel(addr) {
932                            error!(self.ctx.log(), "Bridge error closing channel {:?}", e);
933                        }
934                    }
935                }
936                _ => {
937                    warn!(
938                        self.ctx.log(),
939                        "Trying to close channel to a system which is not connected {}", addr
940                    );
941                }
942            }
943        } else {
944            warn!(self.ctx.log(), "Closing channel to unknown system {}", addr);
945        }
946    }
947}
948
949impl Actor for NetworkDispatcher {
950    type Message = DispatchEnvelope;
951
952    fn receive_local(&mut self, msg: Self::Message) -> HandlerResult {
953        match msg {
954            DispatchEnvelope::Msg { src: _, dst, msg } => {
955                if let Err(e) = self.route(dst, msg) {
956                    error!(self.ctx.log(), "Failed to route message: {:?}", e);
957                };
958            }
959            DispatchEnvelope::ForwardedMsg { msg } => {
960                // Look up destination (local or remote), then route or err
961                if let Err(e) = self.route(msg.receiver.clone(), DispatchData::NetMessage(msg)) {
962                    error!(self.ctx.log(), "Failed to route message: {:?}", e);
963                };
964            }
965            DispatchEnvelope::Registration(reg) => {
966                trace!(self.log(), "Got registration request: {:?}", reg);
967                let RegistrationEnvelope {
968                    event,
969                    update,
970                    promise,
971                } = reg;
972                match event {
973                    RegistrationEvent::Actor(rea) => self.register_actor(rea, update, promise),
974                    RegistrationEvent::Policy(rep) => self.register_policy(rep, update, promise),
975                }
976            }
977            DispatchEnvelope::Event(ev) => self.on_event(ev),
978            DispatchEnvelope::LockedChunk(trash) => self.garbage_buffers.push_back(trash),
979        }
980        Handled::OK
981    }
982
983    fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
984        warn!(self.ctx.log(), "Received network message: {:?}", msg,);
985        Handled::OK
986    }
987}
988
989impl Dispatcher for NetworkDispatcher {
990    /// Generates a [SystemPath](SystemPath) from this dispatcher's configuration
991    ///
992    /// This is only possible after the socket is bound and will panic if attempted earlier!
993    fn system_path(&mut self) -> SystemPath {
994        match self.system_path.as_ref() {
995            Some(path) => path.clone(),
996            None => {
997                let bound_addr = match self.net_bridge {
998                    Some(ref net_bridge) => net_bridge
999                        .local_addr()
1000                        .expect("If net bridge is ready, port should be as well!"),
1001                    None => panic!(
1002                        "You must wait until the socket is bound before attempting to create a system path!"
1003                    ),
1004                };
1005                let sp = SystemPath::new(self.cfg.transport, bound_addr.ip(), bound_addr.port());
1006                self.system_path = Some(sp.clone());
1007                sp
1008            }
1009        }
1010    }
1011}
1012
1013impl NetworkDispatcher {
1014    /// Returns the provided network status port for this dispatcher.
1015    pub fn network_status_port(&mut self) -> &mut ProvidedPort<NetworkStatusPort> {
1016        &mut self.network_status_port
1017    }
1018}
1019
1020impl ComponentLifecycle for NetworkDispatcher {
1021    fn on_start(&mut self) -> HandlerResult {
1022        info!(self.ctx.log(), "Starting network...");
1023        self.start();
1024        info!(self.ctx.log(), "Started network just fine.");
1025        if let Some(promise) = self.notify_ready.take() {
1026            promise
1027                .complete()
1028                .unwrap_or_else(|e| error!(self.ctx.log(), "Could not start network! {:?}", e))
1029        }
1030        Handled::OK
1031    }
1032
1033    fn on_stop(&mut self) -> HandlerResult {
1034        info!(self.ctx.log(), "Stopping network...");
1035        self.stop();
1036        info!(self.ctx.log(), "Stopped network.");
1037        Handled::OK
1038    }
1039
1040    fn on_kill(&mut self) -> HandlerResult {
1041        info!(self.ctx.log(), "Killing network...");
1042        self.kill();
1043        info!(self.ctx.log(), "Killed network.");
1044        Handled::OK
1045    }
1046}
1047
1048impl Provide<NetworkStatusPort> for NetworkDispatcher {
1049    fn handle(&mut self, event: <NetworkStatusPort as Port>::Request) -> HandlerResult {
1050        debug!(
1051            self.ctx.log(),
1052            "Received NetworkStatusPort Request {:?}", event
1053        );
1054        match event {
1055            NetworkStatusRequest::DisconnectSystem(system_path) => {
1056                self.close_channel(system_path.socket_address());
1057            }
1058            NetworkStatusRequest::ConnectSystem(system_path) => {
1059                if let Some(bridge) = &self.net_bridge {
1060                    bridge
1061                        .connect(system_path.protocol(), system_path.socket_address())
1062                        .unwrap();
1063                }
1064            }
1065            NetworkStatusRequest::BlockIp(ip_addr) => {
1066                debug!(self.ctx.log(), "Got BlockIp: {:?}", ip_addr);
1067                if let Some(bridge) = &self.net_bridge {
1068                    bridge.block_ip(ip_addr).unwrap();
1069                }
1070            }
1071            NetworkStatusRequest::BlockSystem(system_path) => {
1072                debug!(self.ctx.log(), "Got BlockSystem: {:?}", system_path);
1073                if let Some(bridge) = &self.net_bridge {
1074                    bridge.block_socket(system_path.socket_address()).unwrap();
1075                }
1076            }
1077            NetworkStatusRequest::BlockIpNet(ip_net) => {
1078                debug!(self.ctx.log(), "Got BlockIpNet: {:?}", ip_net);
1079                if let Some(bridge) = &self.net_bridge {
1080                    bridge.block_ip_net(ip_net).unwrap();
1081                }
1082            }
1083            NetworkStatusRequest::AllowIp(ip_addr) => {
1084                debug!(self.ctx.log(), "Got AllowIp: {:?}", ip_addr);
1085                if let Some(bridge) = &self.net_bridge {
1086                    bridge.allow_ip(ip_addr).unwrap();
1087                }
1088            }
1089            NetworkStatusRequest::AllowSystem(system_path) => {
1090                debug!(self.ctx.log(), "Got AllowSystem: {:?}", system_path);
1091                if let Some(bridge) = &self.net_bridge {
1092                    bridge.allow_socket(system_path.socket_address()).unwrap();
1093                }
1094            }
1095            NetworkStatusRequest::AllowIpNet(ip_net) => {
1096                debug!(self.ctx.log(), "Got AllowIpNet: {:?}", ip_net);
1097                if let Some(bridge) = &self.net_bridge {
1098                    bridge.allow_ip_net(ip_net).unwrap();
1099                }
1100            }
1101        }
1102        Handled::OK
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::{super::*, *};
1109    use crate::net_test_helpers::{PingerAct, PongerAct};
1110    use std::{thread, time::Duration};
1111
1112    #[test]
1113    fn failed_network() {
1114        let conflicting_socket =
1115            std::net::TcpListener::bind("127.0.0.1:0").expect("temporary conflicting listener");
1116        let conflicting_addr = conflicting_socket.local_addr().expect("listener address");
1117        let mut cfg = kompact::test_support::test_kompact_config();
1118        println!("Configuring network");
1119        cfg.system_components(DeadletterBox::new, {
1120            let net_config = NetworkConfig::new(conflicting_addr);
1121            net_config.build()
1122        });
1123        assert!(
1124            cfg.build().wait().is_err(),
1125            "network startup should fail while another listener owns the socket"
1126        );
1127    }
1128
1129    #[test]
1130    fn network_cleanup() {
1131        let mut cfg = kompact::test_support::test_kompact_config();
1132        println!("Configuring network");
1133        cfg.system_components(DeadletterBox::new, {
1134            let net_config =
1135                NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
1136            net_config.build()
1137        });
1138        println!("Starting KompactSystem");
1139        let system = cfg.build().wait().expect("KompactSystem");
1140        println!("KompactSystem started just fine.");
1141        let named_path = ActorPath::Named(NamedPath::with_system(
1142            system.system_path(),
1143            vec!["test".into()],
1144        ));
1145        println!("Got path: {}", named_path);
1146        let port = system.system_path().port();
1147        println!("Got port: {}", port);
1148        println!("Shutting down first system...");
1149        system
1150            .shutdown()
1151            .wait()
1152            .expect("KompactSystem failed to shut down!");
1153        println!("System shut down.");
1154        let mut cfg2 = kompact::test_support::test_kompact_config();
1155        println!("Configuring network");
1156        cfg2.system_components(DeadletterBox::new, {
1157            let net_config =
1158                NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
1159            net_config.build()
1160        });
1161        println!("Starting 2nd KompactSystem");
1162        let system2 = cfg2.build().wait().expect("KompactSystem");
1163        thread::sleep(Duration::from_millis(100));
1164        println!("2nd KompactSystem started just fine.");
1165        let named_path2 = ActorPath::Named(NamedPath::with_system(
1166            system2.system_path(),
1167            vec!["test".into()],
1168        ));
1169        println!("Got path: {}", named_path);
1170        assert_eq!(named_path, named_path2);
1171        system2
1172            .shutdown()
1173            .wait()
1174            .expect("2nd KompactSystem failed to shut down!");
1175    }
1176
1177    /// This is similar to network_cleanup test that will trigger a failed binding.
1178    /// The retry should occur when system2 is building and should succeed after system1 is killed.
1179    #[test]
1180    fn network_cleanup_with_timeout() {
1181        let mut cfg = kompact::test_support::test_kompact_config();
1182        println!("Configuring network");
1183        cfg.system_components(DeadletterBox::new, {
1184            let net_config =
1185                NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
1186            net_config.build()
1187        });
1188        println!("Starting KompactSystem");
1189        let system = cfg.build().wait().expect("KompactSystem");
1190        println!("KompactSystem started just fine.");
1191        let named_path = ActorPath::Named(NamedPath::with_system(
1192            system.system_path(),
1193            vec!["test".into()],
1194        ));
1195        println!("Got path: {}", named_path);
1196        let port = system.system_path().port();
1197        println!("Got port: {}", port);
1198
1199        thread::Builder::new()
1200            .name("System1 Killer".to_string())
1201            .spawn(move || {
1202                thread::sleep(Duration::from_millis(100));
1203                println!("Shutting down first system...");
1204                system
1205                    .shutdown()
1206                    .wait()
1207                    .expect("KompactSystem failed to shut down!");
1208                println!("System shut down.");
1209            })
1210            .ok();
1211
1212        let mut cfg2 = kompact::test_support::test_kompact_config();
1213        println!("Configuring network");
1214        cfg2.system_components(DeadletterBox::new, {
1215            let net_config =
1216                NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
1217            net_config.build()
1218        });
1219        println!("Starting 2nd KompactSystem");
1220        let system2 = cfg2.build().wait().expect("KompactSystem");
1221        thread::sleep(Duration::from_millis(100));
1222        println!("2nd KompactSystem started just fine.");
1223        let named_path2 = ActorPath::Named(NamedPath::with_system(
1224            system2.system_path(),
1225            vec!["test".into()],
1226        ));
1227        println!("Got path: {}", named_path);
1228        assert_eq!(named_path, named_path2);
1229        system2
1230            .shutdown()
1231            .wait()
1232            .expect("2nd KompactSystem failed to shut down!");
1233    }
1234
1235    #[test]
1236    fn test_system_path_timing() {
1237        let mut cfg = kompact::test_support::test_kompact_config();
1238        println!("Configuring network");
1239        cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
1240        println!("Starting KompactSystem");
1241        let system = cfg.build().wait().expect("KompactSystem");
1242        println!("KompactSystem started just fine.");
1243        let named_path = ActorPath::Named(NamedPath::with_system(
1244            system.system_path(),
1245            vec!["test".into()],
1246        ));
1247        println!("Got path: {}", named_path);
1248        // if nothing panics the test succeeds
1249    }
1250
1251    #[test]
1252    // Identical with `remote_lost_and_continued_connection` up to the final sleep time and assertion
1253    // system1 times out in its reconnection attempts and drops the enqueued buffers.
1254    // After indirectly asserting that the queue was dropped we start up a new pinger, and assert that it succeeds.
1255    fn cleanup_bufferchunks_from_dead_actors() {
1256        let system1 = || {
1257            let mut cfg = kompact::test_support::test_kompact_config();
1258            cfg.system_components(
1259                DeadletterBox::new,
1260                NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")).build(),
1261            );
1262            cfg.build().wait().expect("KompactSystem")
1263        };
1264        let system2 = |port| {
1265            let mut cfg = kompact::test_support::test_kompact_config();
1266            cfg.system_components(
1267                DeadletterBox::new,
1268                NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port)).build(),
1269            );
1270            cfg.build().wait().expect("KompactSystem")
1271        };
1272
1273        // Set-up system2a
1274        let system2a = system2(0);
1275        let port = system2a.system_path().port();
1276        //let (ponger_unique, pouf) = remote.create_and_register(PongerAct::new);
1277        let (ponger_named, ponf) = system2a.create_and_register(PongerAct::new_lazy);
1278        let poaf = system2a.register_by_alias(&ponger_named, "custom_name");
1279        ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1280        poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1281        let named_path = ActorPath::Named(NamedPath::with_system(
1282            system2a.system_path(),
1283            vec!["custom_name".into()],
1284        ));
1285        let named_path_clone = named_path;
1286        // Set-up system1
1287        let system1: KompactSystem = system1();
1288        let (pinger_named, pinf) =
1289            system1.create_and_register(move || PingerAct::new_eager(named_path_clone));
1290        pinf.wait_expect(Duration::from_millis(1000), "Pinger failed to register!");
1291
1292        // Kill system2a
1293        system2a.shutdown().wait().ok();
1294        // Start system1
1295        system1.start(&pinger_named);
1296        // Wait for the pings to be sent from the actor to the NetworkDispatch and onto the Thread
1297        thread::sleep(Duration::from_millis(100));
1298        // Kill the actor and wait for its BufferChunk to reach the NetworkDispatch and let the reaping try at least once
1299        system1.kill(pinger_named);
1300
1301        // TODO no sleeps!
1302        thread::sleep(Duration::from_millis(5000));
1303
1304        // Assertion 1: The Network_Dispatcher on system1 has >0 buffers to cleanup
1305        let mut garbage_len = 0;
1306        system1.with_dispatcher_definition(|dispatcher| {
1307            let dispatcher = dispatcher
1308                .downcast_mut::<NetworkDispatcher>()
1309                .expect("expected kompact-net NetworkDispatcher");
1310            garbage_len = dispatcher.garbage_buffers.len();
1311        });
1312        assert_ne!(0, garbage_len);
1313
1314        // Start up system2b
1315        println!("Setting up system2b");
1316        let system2b = system2(port);
1317        let (ponger_named, ponf) = system2b.create_and_register(PongerAct::new_lazy);
1318        let poaf = system2b.register_by_alias(&ponger_named, "custom_name");
1319        ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1320        poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
1321        println!("Starting actor on system2b");
1322        system2b.start(&ponger_named);
1323
1324        // We give the connection plenty of time to re-establish and transfer it's old queue and cleanup the BufferChunk
1325        // TODO no sleeps!
1326        thread::sleep(Duration::from_millis(10000));
1327
1328        // Assertion 2: The Network_Dispatcher on system1 now has 0 buffers to cleanup.
1329        system1.with_dispatcher_definition(|dispatcher| {
1330            let dispatcher = dispatcher
1331                .downcast_mut::<NetworkDispatcher>()
1332                .expect("expected kompact-net NetworkDispatcher");
1333            garbage_len = dispatcher.garbage_buffers.len();
1334        });
1335        assert_eq!(0, garbage_len);
1336
1337        system1
1338            .shutdown()
1339            .wait()
1340            .expect("Kompact didn't shut down properly");
1341        system2b
1342            .shutdown()
1343            .wait()
1344            .expect("Kompact didn't shut down properly");
1345    }
1346}