Skip to main content

kompact_net/transport/
mod.rs

1use super::*;
2use actors::Transport;
3use arc_swap::ArcSwap;
4use dispatch::lookup::ActorStore;
5
6use crate::{
7    NetworkStatus,
8    dispatch::NetworkConfig,
9    events::NetworkDispatcherEvent,
10    messaging::{DispatchData, DispatchEnvelope},
11    net::{SessionId, events::DispatchEvent, frames::*, network_thread::NetworkThreadBuilder},
12};
13use crossbeam_channel::{RecvError, SendError, Sender, unbounded as channel};
14use ipnet::IpNet;
15use kompact::runtime::{BacktraceSuffix, SourceCause};
16use mio::{Interest, Waker};
17use snafu::{IntoError, ResultExt};
18pub use std::net::SocketAddr;
19use std::{io, net::IpAddr, panic, sync::Arc, thread, time::Duration};
20
21pub(crate) mod network_channel;
22pub mod network_thread;
23pub(crate) mod udp_state;
24
25/// The state of a connection
26#[derive(Clone, Debug)]
27pub enum ConnectionState {
28    /// Newly created
29    New,
30    /// Initialising the connection
31    Initializing,
32    /// Connected
33    Connected(SessionId),
34    /// Closed gracefully, by request on either side
35    Closed(SessionId),
36    /// Unexpected lost connection
37    Lost(SessionId),
38    /// Blocked by request from component through NetworkStatusPort
39    Blocked,
40    // Threw an error
41    // Error(std::io::Error),
42}
43
44pub enum Protocol {
45    Tcp,
46    Udp,
47}
48impl From<Transport> for Protocol {
49    fn from(t: Transport) -> Self {
50        match t {
51            Transport::Tcp => Protocol::Tcp,
52            Transport::Udp => Protocol::Udp,
53            _ => unimplemented!("Unsupported Protocol"),
54        }
55    }
56}
57
58/// Events on the network level
59pub mod events {
60
61    use crate::{messaging::DispatchData, net::SocketAddr};
62    use ipnet::IpNet;
63    use std::net::IpAddr;
64
65    /// BridgeEvents emitted to the network `Bridge`
66    #[derive(Debug)]
67    pub enum DispatchEvent {
68        /// Send the `SerialisedFrame` to receiver associated with the `SocketAddr`
69        SendTcp(SocketAddr, DispatchData),
70        /// Send the `SerialisedFrame` to receiver associated with the `SocketAddr`
71        SendUdp(SocketAddr, DispatchData),
72        /// Tells the network thread to Stop, will gracefully shutdown all channels.
73        Stop,
74        /// Tells the network thread to Die as soon as possible, without graceful shutdown.
75        Kill,
76        /// Tells the `NetworkThread` to open up a channel to the `SocketAddr`
77        Connect(SocketAddr),
78        /// Acknowledges a closed channel, required to ensure FIFO ordering under connection loss
79        ClosedAck(SocketAddr),
80        /// Tells the `NetworkThread` to gracefully close the channel to the `SocketAddr`
81        Close(SocketAddr),
82        /// Tells the `NetworkThread` to block the `SocketAddr`
83        BlockSocket(SocketAddr),
84        /// Tells the `NetworkThread` to block the `IpAddr`
85        BlockIpAddr(IpAddr),
86        /// Tells the `NetworkThread` to allow the `IpNet`
87        BlockIpNet(IpNet),
88        /// Tells the `NetworkThread` to allow the `SocketAddr`
89        AllowSocket(SocketAddr),
90        /// Tells the `NetworkThread` to allow the `IpAddr`
91        AllowIpAddr(IpAddr),
92        /// Tells the `NetworkThread` to allow the `IpNet`
93        AllowIpNet(IpNet),
94    }
95}
96
97/// The configuration for the network `Bridge`
98#[allow(dead_code)]
99pub struct BridgeConfig {
100    retry_strategy: RetryStrategy,
101}
102
103impl BridgeConfig {
104    /// Create a new config
105    ///
106    /// This is the same as the [Default](std::default::Default) implementation.
107    #[allow(dead_code)]
108    pub fn new() -> Self {
109        BridgeConfig::default()
110    }
111}
112
113#[allow(dead_code)]
114enum RetryStrategy {
115    ExponentialBackoff { base_ms: u64, num_tries: usize },
116}
117
118#[allow(dead_code)]
119impl Default for BridgeConfig {
120    fn default() -> Self {
121        let retry_strategy = RetryStrategy::ExponentialBackoff {
122            base_ms: 100,
123            num_tries: 5,
124        };
125        BridgeConfig { retry_strategy }
126    }
127}
128
129/// Bridge to Network Threads. Routes outbound messages to the correct network thread. Single threaded for now.
130pub struct Bridge {
131    /// Network-specific configuration
132    //cfg: BridgeConfig,
133    /// Core logger; shared with network thread
134    log: KompactLogger,
135    /// Shared actor reference lookup table
136    // lookup: Arc<ArcSwap<ActorStore>>,
137    /// Network Thread stuff:
138    // network_thread: Box<NetworkThread>,
139    // ^ Can we avoid storing this by moving it into itself?
140    network_input_queue: Sender<events::DispatchEvent>,
141    waker: Waker,
142    /// Tokio Runtime
143    // tokio_runtime: Option<Runtime>,
144    /// Reference back to the Kompact dispatcher
145    dispatcher: Option<DispatcherRef>,
146    /// Socket the network actually bound on
147    bound_address: Option<SocketAddr>,
148    shutdown_future: KFuture<()>,
149}
150
151impl Bridge {
152    /// Creates a new bridge
153    ///
154    /// # Returns
155    /// A tuple consisting of the new Bridge object and the network event receiver.
156    /// The receiver will allow responding to [NetworkEvent]s for external state management.
157    pub fn new(
158        lookup: Arc<ArcSwap<ActorStore>>,
159        network_thread_log: KompactLogger,
160        bridge_log: KompactLogger,
161        addr: SocketAddr,
162        dispatcher_ref: DispatcherRef,
163        network_config: &NetworkConfig,
164    ) -> (Self, SocketAddr) {
165        let (sender, receiver) = channel();
166        let (shutdown_p, shutdown_f) = promise();
167        match NetworkThreadBuilder::new(
168            network_thread_log,
169            addr,
170            lookup,
171            receiver,
172            shutdown_p,
173            dispatcher_ref.clone(),
174            network_config.clone(),
175        ) {
176            Ok(mut network_thread_builder) => {
177                let bound_address = network_thread_builder.address;
178                let waker = network_thread_builder
179                    .take_waker()
180                    .expect("NetworkThread poll error");
181
182                let (started_p, started_f) = promise();
183                run_network_thread(
184                    network_thread_builder,
185                    bridge_log.clone(),
186                    started_p,
187                    dispatcher_ref.clone(),
188                )
189                .expect("Failed to spawn NetworkThread");
190                started_f
191                    .wait_timeout(Duration::from_millis(network_config.get_boot_timeout()))
192                    .expect("NetworkThread time-out during boot sequence");
193
194                let bridge = Bridge {
195                    // cfg: BridgeConfig::default(),
196                    log: bridge_log,
197                    // lookup,
198                    network_input_queue: sender,
199                    waker,
200                    dispatcher: Some(dispatcher_ref),
201                    bound_address: Some(bound_address),
202                    shutdown_future: shutdown_f,
203                };
204
205                (bridge, bound_address)
206            }
207            Err(e) => {
208                panic!("Failed to build a Network Thread, error: {:?}", e);
209            }
210        }
211    }
212
213    fn send_and_wake(&self, event: DispatchEvent) -> Result<(), NetworkBridgeError> {
214        self.network_input_queue
215            .send(event)
216            .context(network_bridge_error::SendSnafu)?;
217        self.waker.wake().context(network_bridge_error::IoSnafu)?;
218        Ok(())
219    }
220
221    /// Sets the dispatcher reference, returning the previously stored one
222    pub fn set_dispatcher(&mut self, dispatcher: DispatcherRef) -> Option<DispatcherRef> {
223        self.dispatcher.replace(dispatcher)
224    }
225
226    /// Stops the bridge gracefully
227    pub fn stop(self) -> Result<(), NetworkBridgeError> {
228        debug!(self.log, "Stopping NetworkBridge...");
229        self.send_and_wake(DispatchEvent::Stop)?;
230        self.shutdown_future.wait(); // should block until something is sent
231        debug!(self.log, "Stopped NetworkBridge.");
232        Ok(())
233    }
234
235    /// Kills the Network
236    pub fn kill(self) -> Result<(), NetworkBridgeError> {
237        debug!(self.log, "Killing NetworkBridge...");
238        self.send_and_wake(DispatchEvent::Kill)?;
239        self.shutdown_future.wait(); // should block until something is sent
240        debug!(self.log, "Stopped NetworkBridge.");
241        Ok(())
242    }
243
244    /// Returns the local address if already bound
245    pub fn local_addr(&self) -> &Option<SocketAddr> {
246        &self.bound_address
247    }
248
249    /// Forwards `serialized` to the NetworkThread and makes sure that it will wake up.
250    pub(crate) fn route(
251        &self,
252        addr: SocketAddr,
253        data: DispatchData,
254        protocol: Protocol,
255    ) -> Result<(), NetworkBridgeError> {
256        let event = match protocol {
257            Protocol::Tcp => DispatchEvent::SendTcp(addr, data),
258            Protocol::Udp => DispatchEvent::SendUdp(addr, data),
259        };
260        self.send_and_wake(event)
261    }
262
263    /// Attempts to establish a TCP connection to the provided `addr`.
264    ///
265    /// # Side effects
266    /// When the connection is successul:
267    ///     - a `ConnectionState::Connected` is dispatched on the network bridge event queue
268    ///     - NetworkThread will listen for incoming messages and write outgoing messages on the channel
269    ///
270    /// # Errors
271    /// If the provided protocol is not supported
272    pub fn connect(&self, proto: Transport, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
273        match proto {
274            Transport::Tcp => self.send_and_wake(DispatchEvent::Connect(addr)),
275            other => Err(NetworkBridgeError::unsupported_protocol(other)),
276        }
277    }
278
279    /// Acknowledges a closed channel, required to ensure FIFO ordering under connection loss
280    pub fn ack_closed(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
281        self.send_and_wake(DispatchEvent::ClosedAck(addr))
282    }
283
284    /// Requests that the NetworkThread should be closed
285    pub fn close_channel(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
286        self.send_and_wake(DispatchEvent::Close(addr))
287    }
288
289    /// Requests the NetworkThread to block the socket addr
290    pub fn block_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
291        self.send_and_wake(DispatchEvent::BlockSocket(addr))
292    }
293
294    /// Requests the NetworkThread to block the ip address ip_addr
295    pub fn block_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeError> {
296        self.send_and_wake(DispatchEvent::BlockIpAddr(ip_addr))
297    }
298
299    /// Requests the NetworkThread to block the ip subnet ip_net
300    pub fn block_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeError> {
301        self.send_and_wake(DispatchEvent::BlockIpNet(ip_net))
302    }
303
304    /// Requests the NetworkThread to unblock the socket addr
305    pub fn allow_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
306        self.send_and_wake(DispatchEvent::AllowSocket(addr))
307    }
308
309    /// Requests the NetworkThread to unblock the ip address ip_addr
310    pub fn allow_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeError> {
311        self.send_and_wake(DispatchEvent::AllowIpAddr(ip_addr))
312    }
313
314    /// Requests the NetworkThread to unblock the ip subnet ip_net
315    pub fn allow_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeError> {
316        self.send_and_wake(DispatchEvent::AllowIpNet(ip_net))
317    }
318}
319
320fn run_network_thread(
321    builder: NetworkThreadBuilder,
322    logger: KompactLogger,
323    started_promise: KPromise<()>,
324    dispatcher_ref: DispatcherRef,
325) -> async_std::io::Result<()> {
326    thread::Builder::new()
327        .name("network_thread".to_string())
328        .spawn(move || {
329            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
330                let network_thread = builder.build();
331                started_promise
332                    .complete()
333                    .expect("NetworkThread started but failed to fulfil promise");
334                network_thread.run()
335            })) {
336                if let Some(error_msg) = e.downcast_ref::<&str>() {
337                    error!(logger, "NetworkThread panicked with: {:?}", &error_msg);
338                } else if let Some(error_msg) = e.downcast_ref::<String>() {
339                    error!(logger, "NetworkThread panicked with: {:?}", error_msg);
340                } else {
341                    error!(
342                        logger,
343                        "NetworkThread panicked with type id={:?}",
344                        (*e).type_id()
345                    );
346                }
347                dispatcher_ref.tell(DispatchEnvelope::Event(Box::new(
348                    NetworkDispatcherEvent::Network(NetworkStatus::CriticalNetworkFailure),
349                )))
350            }
351        })
352        .map(|_| ())
353}
354
355/// Error returned when the network bridge fails.
356#[derive(Debug, snafu::Snafu)]
357pub struct NetworkBridgeError(Box<NetworkBridgeErrorKind>);
358
359#[derive(Debug, snafu::Snafu)]
360#[snafu(module(network_bridge_error), visibility(pub(crate)))]
361enum NetworkBridgeErrorKind {
362    #[snafu(display(
363        "network bridge does not support protocol {protocol:?} ({location}){}",
364        BacktraceSuffix(backtrace.as_ref())
365    ))]
366    UnsupportedProtocol {
367        protocol: Transport,
368        #[snafu(implicit)]
369        location: snafu::Location,
370        backtrace: Option<snafu::Backtrace>,
371    },
372    #[snafu(display(
373        "failed to send network bridge event ({location}).{}",
374        SourceCause(source, None, backtrace.as_ref())
375    ))]
376    Send {
377        source: SendError<DispatchEvent>,
378        #[snafu(implicit)]
379        location: snafu::Location,
380        backtrace: Option<snafu::Backtrace>,
381    },
382    #[snafu(display(
383        "network bridge IO failed ({location}).{}",
384        SourceCause(source, None, backtrace.as_ref())
385    ))]
386    Io {
387        source: io::Error,
388        #[snafu(implicit)]
389        location: snafu::Location,
390        backtrace: Option<snafu::Backtrace>,
391    },
392    #[snafu(display(
393        "network bridge receive failed ({location}).{}",
394        SourceCause(source, None, backtrace.as_ref())
395    ))]
396    Receive {
397        source: RecvError,
398        #[snafu(implicit)]
399        location: snafu::Location,
400        backtrace: Option<snafu::Backtrace>,
401    },
402    #[snafu(display(
403        "network bridge serialisation failed ({location}).{}",
404        SourceCause(source, None, backtrace.as_ref())
405    ))]
406    Serialisation {
407        source: SerError,
408        #[snafu(implicit)]
409        location: snafu::Location,
410        backtrace: Option<snafu::Backtrace>,
411    },
412    #[snafu(display(
413        "network bridge failed: {message} ({location}){}",
414        BacktraceSuffix(backtrace.as_ref())
415    ))]
416    Message {
417        message: String,
418        #[snafu(implicit)]
419        location: snafu::Location,
420        backtrace: Option<snafu::Backtrace>,
421    },
422}
423
424impl NetworkBridgeError {
425    /// Construct an unsupported-protocol bridge error.
426    #[track_caller]
427    pub fn unsupported_protocol(protocol: Transport) -> Self {
428        network_bridge_error::UnsupportedProtocolSnafu { protocol }
429            .build()
430            .into()
431    }
432
433    /// Construct a message-only bridge error.
434    #[track_caller]
435    pub fn message(message: impl Into<String>) -> Self {
436        network_bridge_error::MessageSnafu {
437            message: message.into(),
438        }
439        .build()
440        .into()
441    }
442}
443
444impl From<SendError<DispatchEvent>> for NetworkBridgeError {
445    #[track_caller]
446    fn from(source: SendError<DispatchEvent>) -> Self {
447        network_bridge_error::SendSnafu.into_error(source).into()
448    }
449}
450
451impl From<io::Error> for NetworkBridgeError {
452    #[track_caller]
453    fn from(source: io::Error) -> Self {
454        network_bridge_error::IoSnafu.into_error(source).into()
455    }
456}
457
458impl From<RecvError> for NetworkBridgeError {
459    #[track_caller]
460    fn from(source: RecvError) -> Self {
461        network_bridge_error::ReceiveSnafu.into_error(source).into()
462    }
463}
464
465impl From<SerError> for NetworkBridgeError {
466    #[track_caller]
467    fn from(source: SerError) -> Self {
468        network_bridge_error::SerialisationSnafu
469            .into_error(source)
470            .into()
471    }
472}
473
474#[cfg(test)]
475mod bridge_error_tests {
476    use super::*;
477    use kompact::test_support::assert_display_matches;
478    use std::error::Error;
479
480    #[test]
481    fn network_bridge_error_display_includes_source_and_location() {
482        let source = io::Error::other("waker failed");
483        let error = NetworkBridgeError::from(source);
484
485        assert_display_matches(
486            &error.to_string(),
487            "\
488network bridge IO failed {LOC}.
489  Caused by: waker failed.",
490            file!(),
491        );
492        assert_eq!(
493            "waker failed",
494            error.source().expect("io source").to_string()
495        );
496    }
497
498    #[test]
499    fn network_bridge_send_error_preserves_send_error_source() {
500        let (sender, receiver) = channel::<DispatchEvent>();
501        drop(receiver);
502        let source = sender
503            .send(DispatchEvent::Stop)
504            .expect_err("send should fail without receiver");
505        let error: NetworkBridgeError = network_bridge_error::SendSnafu.into_error(source).into();
506
507        assert_display_matches(
508            &error.to_string(),
509            "\
510failed to send network bridge event {LOC}.
511  Caused by: sending on a disconnected channel.",
512            file!(),
513        );
514        assert_eq!(
515            "sending on a disconnected channel",
516            error.source().expect("send source").to_string()
517        );
518    }
519}
520
521/*
522* Error handling helper functions
523*/
524pub(crate) fn would_block(err: &io::Error) -> bool {
525    err.kind() == io::ErrorKind::WouldBlock
526}
527
528pub(crate) fn interrupted(err: &io::Error) -> bool {
529    err.kind() == io::ErrorKind::Interrupted
530}
531
532pub(crate) fn no_buffer_space(err: &io::Error) -> bool {
533    err.kind() == io::ErrorKind::InvalidInput
534}
535
536pub(crate) fn connection_reset(err: &io::Error) -> bool {
537    err.kind() == io::ErrorKind::ConnectionReset
538}
539
540pub(crate) fn broken_pipe(err: &io::Error) -> bool {
541    err.kind() == io::ErrorKind::BrokenPipe
542}
543
544pub(crate) fn out_of_buffers(err: &SerError) -> bool {
545    err.kind() == SerErrorKind::NoBuffersAvailable
546}
547
548/// A module with helper functions for testing network configurations/implementations
549pub mod net_test_helpers {
550    use crate::{
551        NetworkStatus,
552        NetworkStatusPort,
553        NetworkStatusRequest,
554        messaging::{DispatchData, SerialisedFrame},
555        prelude::*,
556    };
557    use crossbeam_channel::Sender;
558    use std::{
559        cmp::Ordering,
560        collections::VecDeque,
561        fmt::{Debug, Formatter},
562        time::{Duration, SystemTime, UNIX_EPOCH},
563    };
564    use uuid::Uuid;
565
566    /// The number of Ping-Pong messages, used in assertions and Pingers/BigPingers
567    pub const PING_COUNT: u64 = 10;
568
569    #[derive(Debug, Clone)]
570    struct PingMsg {
571        i: u64,
572    }
573
574    #[derive(Debug, Clone)]
575    struct PongMsg {
576        i: u64,
577    }
578
579    #[derive(Debug, Clone)]
580    struct PingPongSer;
581
582    impl PingPongSer {
583        const SID: SerId = 42;
584    }
585
586    const PING_ID: i8 = 1;
587    const PONG_ID: i8 = 2;
588
589    impl Serialiser<PingMsg> for PingPongSer {
590        fn ser_id(&self) -> SerId {
591            Self::SID
592        }
593
594        fn size_hint(&self) -> Option<usize> {
595            Some(9)
596        }
597
598        fn serialise(&self, v: &PingMsg, buf: &mut dyn BufMut) -> Result<(), SerError> {
599            buf.put_i8(PING_ID);
600            buf.put_u64(v.i);
601            Result::Ok(())
602        }
603    }
604
605    impl Serialisable for PingMsg {
606        fn ser_id(&self) -> SerId {
607            42
608        }
609
610        fn size_hint(&self) -> Option<usize> {
611            Some(9)
612        }
613
614        fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
615            buf.put_i8(PING_ID);
616            buf.put_u64(self.i);
617            Result::Ok(())
618        }
619
620        fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
621            Ok(self)
622        }
623    }
624
625    impl Serialisable for PongMsg {
626        fn ser_id(&self) -> SerId {
627            42
628        }
629
630        fn size_hint(&self) -> Option<usize> {
631            Some(9)
632        }
633
634        fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
635            buf.put_i8(PONG_ID);
636            buf.put_u64(self.i);
637            Result::Ok(())
638        }
639
640        fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
641            Ok(self)
642        }
643    }
644
645    impl Serialiser<PongMsg> for PingPongSer {
646        fn ser_id(&self) -> SerId {
647            Self::SID
648        }
649
650        fn size_hint(&self) -> Option<usize> {
651            Some(9)
652        }
653
654        fn serialise(&self, v: &PongMsg, buf: &mut dyn BufMut) -> Result<(), SerError> {
655            buf.put_i8(PONG_ID);
656            buf.put_u64(v.i);
657            Result::Ok(())
658        }
659    }
660
661    impl Deserialiser<PingMsg> for PingPongSer {
662        const SER_ID: SerId = Self::SID;
663
664        fn deserialise(buf: &mut dyn Buf) -> Result<PingMsg, SerError> {
665            if buf.remaining() < 9 {
666                return Err(SerError::invalid_data(format!(
667                    "Serialised typed has 9bytes but only {}bytes remain in buffer.",
668                    buf.remaining()
669                )));
670            }
671            match buf.get_i8() {
672                PING_ID => {
673                    let i = buf.get_u64();
674                    Ok(PingMsg { i })
675                }
676                PONG_ID => Err(SerError::invalid_type(
677                    "Found PongMsg, but expected PingMsg.",
678                )),
679                id => Err(SerError::invalid_type(format!(
680                    "Found unknown id {}, but expected PingMsg.",
681                    id
682                ))),
683            }
684        }
685    }
686
687    impl Deserialiser<PongMsg> for PingPongSer {
688        const SER_ID: SerId = Self::SID;
689
690        fn deserialise(buf: &mut dyn Buf) -> Result<PongMsg, SerError> {
691            if buf.remaining() < 9 {
692                return Err(SerError::invalid_data(format!(
693                    "Serialised typed has 9bytes but only {}bytes remain in buffer.",
694                    buf.remaining()
695                )));
696            }
697            match buf.get_i8() {
698                PONG_ID => {
699                    let i = buf.get_u64();
700                    Ok(PongMsg { i })
701                }
702                PING_ID => Err(SerError::invalid_type(
703                    "Found PingMsg, but expected PongMsg.",
704                )),
705                id => Err(SerError::invalid_type(format!(
706                    "Found unknown id {}, but expected PingMsg.",
707                    id
708                ))),
709            }
710        }
711    }
712
713    /// An actor which will send `PingMsg` to `target` over the network and counts the number of
714    /// `PongMsg` replies it receives.
715    /// Target should be a [PongerAct](PongerAct).
716    #[derive(ComponentDefinition)]
717    pub struct PingerAct {
718        ctx: ComponentContext<PingerAct>,
719        target: ActorPath,
720        /// The number of `PongMsg` received
721        pub count: u64,
722        eager: bool,
723        promise: Option<KPromise<()>>,
724        /// Contains the first `SessionId` the Pinger received a message from
725        pub pong_system_session: Option<SessionId>,
726    }
727
728    impl PingerAct {
729        /// Creates a new `PingerAct` sending messages using
730        /// [Lazy Serialisation](crate::actors::ActorPath#method.tell)
731        pub fn new_lazy(target: ActorPath) -> PingerAct {
732            PingerAct {
733                ctx: ComponentContext::uninitialised(),
734                target,
735                count: 0,
736                eager: false,
737                promise: None,
738                pong_system_session: None,
739            }
740        }
741
742        /// Creates a new `PingerAct` sending messages using
743        /// [Eager Serialisation](crate::actors::ActorPath#method.tell_serialised)
744        pub fn new_eager(target: ActorPath) -> PingerAct {
745            PingerAct {
746                ctx: ComponentContext::uninitialised(),
747                target,
748                count: 0,
749                eager: true,
750                promise: None,
751                pong_system_session: None,
752            }
753        }
754
755        /// Creates a future which will be completed by the Pinger when it has received pongs for
756        /// all the pings it sent
757        pub fn completion_future(&mut self) -> KFuture<()> {
758            let (promise, future) = promise();
759            self.promise = Some(promise);
760            future
761        }
762    }
763
764    impl ComponentLifecycle for PingerAct {
765        fn on_start(&mut self) -> HandlerResult {
766            debug!(self.ctx.log(), "Starting");
767            if self.eager {
768                self.target
769                    .tell_serialised(PingMsg { i: 0 }, self)
770                    .expect("serialise");
771            } else {
772                self.target.tell(PingMsg { i: 0 }, self);
773            }
774            Handled::OK
775        }
776    }
777
778    impl Actor for PingerAct {
779        type Message = Never;
780
781        fn receive_local(&mut self, _pong: Self::Message) -> HandlerResult {
782            unimplemented!()
783        }
784
785        fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
786            if self.pong_system_session.is_none() {
787                self.pong_system_session = msg.session();
788            }
789            match msg.try_deserialise::<PongMsg, PingPongSer>() {
790                Ok(pong) => {
791                    debug!(self.ctx.log(), "Got msg {:?}", pong);
792                    self.count += 1;
793                    match self.count.cmp(&PING_COUNT) {
794                        Ordering::Less if self.eager => {
795                            self.target
796                                .tell_serialised(PingMsg { i: pong.i + 1 }, self)
797                                .expect("serialise");
798                        }
799                        Ordering::Less => {
800                            self.target.tell(PingMsg { i: pong.i + 1 }, self);
801                        }
802                        Ordering::Equal if self.promise.is_some() => {
803                            self.promise
804                                .take()
805                                .unwrap()
806                                .complete()
807                                .expect("Failed to fulfil promise");
808                        }
809                        _ => (), // ignore
810                    }
811                }
812                Err(e) => error!(self.ctx.log(), "Error deserialising PongMsg: {:?}", e),
813            }
814            Handled::OK
815        }
816    }
817
818    /// An actor which replies to `PingMsg` with a `PongMsg`
819    #[derive(ComponentDefinition)]
820    pub struct PongerAct {
821        ctx: ComponentContext<PongerAct>,
822        eager: bool,
823        /// number of `PingMsg` received
824        pub count: u64,
825    }
826
827    impl PongerAct {
828        /// Creates a new `PongerAct` replying to `PingMsg`'s using
829        /// [Lazy Serialisation](crate::actors::ActorPath#method.tell)
830        pub fn new_lazy() -> PongerAct {
831            PongerAct {
832                ctx: ComponentContext::uninitialised(),
833                eager: false,
834                count: 0,
835            }
836        }
837
838        /// Creates a new `PongerAct` replying to `PingMsg`'s using
839        /// [Eager Serialisation](crate::actors::ActorPath#method.tell_serialised)
840        pub fn new_eager() -> PongerAct {
841            PongerAct {
842                ctx: ComponentContext::uninitialised(),
843                eager: true,
844                count: 0,
845            }
846        }
847    }
848
849    ignore_lifecycle!(PongerAct);
850
851    impl Actor for PongerAct {
852        type Message = Never;
853
854        fn receive_local(&mut self, _ping: Self::Message) -> HandlerResult {
855            unimplemented!()
856        }
857
858        fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
859            let sender = msg.sender;
860            match_deser! {
861                (msg.data) {
862                    msg(ping): PingMsg [using PingPongSer] => {
863                        debug!(self.ctx.log(), "Got msg {:?} from {}", ping, sender);
864                        self.count += 1;
865                        let pong = PongMsg { i: ping.i };
866                        if self.eager {
867                            sender
868                                .tell_serialised(pong, self)
869                                .expect("PongMsg should serialise");
870                        } else {
871                            sender.tell(pong, self);
872                        }
873                    },
874                    err(e) => error!(self.ctx.log(), "Error deserialising PingMsg: {:?}", e),
875                }
876            }
877            Handled::OK
878        }
879    }
880
881    /// An actor which forwards all network messages to `forward_to`
882    #[derive(ComponentDefinition)]
883    pub struct ForwarderAct {
884        ctx: ComponentContext<Self>,
885        forward_to: ActorPath,
886    }
887    impl ForwarderAct {
888        /// Creates a new `ForwarderAct`
889        pub fn new(forward_to: ActorPath) -> Self {
890            ForwarderAct {
891                ctx: ComponentContext::uninitialised(),
892                forward_to,
893            }
894        }
895    }
896    ignore_lifecycle!(ForwarderAct);
897    impl Actor for ForwarderAct {
898        type Message = Never;
899
900        fn receive_local(&mut self, _ping: Self::Message) -> HandlerResult {
901            unimplemented!()
902        }
903
904        fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
905            debug!(
906                self.ctx.log(),
907                "Forwarding some msg from {} to {}", msg.sender, self.forward_to
908            );
909            self.forward_to.forward_with_original_sender(msg, self);
910            Handled::OK
911        }
912    }
913
914    #[derive(Clone)]
915    struct BigPingMsg {
916        i: u64,
917        data: Vec<u8>,
918        sum: u64,
919    }
920
921    impl BigPingMsg {
922        fn new(i: u64, data_size: usize) -> Self {
923            let mut data = Vec::<u8>::with_capacity(data_size);
924            let mut sum: u64 = 0;
925            // create pseudo random values:
926            let seed = SystemTime::now()
927                .duration_since(UNIX_EPOCH)
928                .expect("Time went backwards")
929                .as_millis() as u64;
930            for _ in 0..data_size {
931                let d = (seed * (2 + i) % 128) as u8;
932                data.push(d);
933                sum += d as u64;
934            }
935            BigPingMsg { i, data, sum }
936        }
937
938        fn validate(&self) {
939            let mut sum: u64 = 0;
940            for d in &self.data {
941                sum += *d as u64;
942            }
943            assert_eq!(self.sum, sum, "BigPingMsg invalid, {:?}", self);
944        }
945    }
946
947    #[derive(Clone)]
948    struct BigPongMsg {
949        i: u64,
950        data: Vec<u8>,
951        sum: u64,
952    }
953
954    impl BigPongMsg {
955        fn new(i: u64, data_size: usize) -> BigPongMsg {
956            let mut data = Vec::<u8>::with_capacity(data_size);
957            let mut sum: u64 = 0;
958            // create pseudo random values:
959            let seed = SystemTime::now()
960                .duration_since(UNIX_EPOCH)
961                .expect("Time went backwards")
962                .as_millis() as u64;
963            for _ in 0..data_size {
964                let d = (seed * (2 + i) % 128) as u8;
965                data.push(d);
966                sum += d as u64;
967            }
968            BigPongMsg { i, data, sum }
969        }
970
971        fn validate(&self) {
972            let mut sum: u64 = 0;
973            for d in &self.data {
974                sum += *d as u64;
975            }
976            assert_eq!(self.sum, sum, "BigPongMsg invalid, {:?}", self);
977        }
978    }
979
980    impl Debug for BigPingMsg {
981        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
982            f.debug_struct("BigPingMsg")
983                .field("i", &self.i)
984                .field("data length", &self.data.len())
985                .field("sum", &self.sum)
986                .field("data 0", &self.data[0])
987                .field("data n", &self.data[&self.data.len() - 1])
988                .finish()
989        }
990    }
991    impl Debug for BigPongMsg {
992        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
993            f.debug_struct("BigPongMsg")
994                .field("i", &self.i)
995                .field("data length", &self.data.len())
996                .field("sum", &self.sum)
997                .field("data 0", &self.data[0])
998                .field("data n", &self.data[&self.data.len() - 1])
999                .finish()
1000        }
1001    }
1002
1003    struct BigPingPongSer;
1004
1005    impl BigPingPongSer {
1006        const SID: SerId = 43;
1007    }
1008
1009    impl Serialisable for BigPingMsg {
1010        fn ser_id(&self) -> SerId {
1011            43
1012        }
1013
1014        fn size_hint(&self) -> Option<usize> {
1015            Some(32 + self.data.len())
1016        }
1017
1018        fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
1019            buf.put_i8(PING_ID);
1020            buf.put_u64(self.i);
1021            buf.put_u64(self.data.len() as u64);
1022            for d in &self.data {
1023                buf.put_u8(*d);
1024            }
1025            buf.put_u64(self.sum);
1026            Result::Ok(())
1027        }
1028
1029        fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
1030            Ok(self)
1031        }
1032    }
1033
1034    impl Serialisable for BigPongMsg {
1035        fn ser_id(&self) -> SerId {
1036            43
1037        }
1038
1039        fn size_hint(&self) -> Option<usize> {
1040            Some(32 + self.data.len())
1041        }
1042
1043        fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
1044            buf.put_i8(PONG_ID);
1045            buf.put_u64(self.i);
1046            buf.put_u64(self.data.len() as u64);
1047            for d in &self.data {
1048                buf.put_u8(*d);
1049            }
1050            buf.put_u64(self.sum);
1051            Result::Ok(())
1052        }
1053
1054        fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
1055            Ok(self)
1056        }
1057    }
1058
1059    impl Deserialiser<BigPingMsg> for BigPingPongSer {
1060        const SER_ID: SerId = Self::SID;
1061
1062        fn deserialise(buf: &mut dyn Buf) -> Result<BigPingMsg, SerError> {
1063            if buf.remaining() < 18 {
1064                return Err(SerError::invalid_data(format!(
1065                    "Serialised typed has 18 bytes but only {} bytes remain in buffer.",
1066                    buf.remaining()
1067                )));
1068            }
1069            match buf.get_i8() {
1070                PING_ID => {
1071                    let i = buf.get_u64();
1072                    let data_len = buf.get_u64() as usize;
1073                    let mut data = Vec::with_capacity(data_len);
1074                    for j in 0..data_len {
1075                        data.insert(j, buf.get_u8());
1076                    }
1077                    let sum = buf.get_u64();
1078                    assert_eq!(buf.remaining(), 0, "Buffer too big! {}", buf.remaining());
1079                    Ok(BigPingMsg { i, data, sum })
1080                }
1081                PONG_ID => Err(SerError::invalid_type(
1082                    "Found BigPongMsg, but expected BigPingMsg.",
1083                )),
1084                id => Err(SerError::invalid_type(format!(
1085                    "Found unknown id {}, but expected BigPingMsg.",
1086                    id
1087                ))),
1088            }
1089        }
1090    }
1091
1092    impl Deserialiser<BigPongMsg> for BigPingPongSer {
1093        const SER_ID: SerId = Self::SID;
1094
1095        fn deserialise(buf: &mut dyn Buf) -> Result<BigPongMsg, SerError> {
1096            if buf.remaining() < 18 {
1097                return Err(SerError::invalid_data(format!(
1098                    "Serialised typed has 18 bytes but only {} bytes remain in buffer.",
1099                    buf.remaining()
1100                )));
1101            }
1102            match buf.get_i8() {
1103                PONG_ID => {
1104                    let i = buf.get_u64();
1105                    let data_len = buf.get_u64() as usize;
1106                    let mut data = Vec::with_capacity(data_len);
1107                    for j in 0..data_len {
1108                        data.insert(j, buf.get_u8());
1109                    }
1110                    let sum = buf.get_u64();
1111                    assert_eq!(buf.remaining(), 0, "Buffer too big!");
1112                    Ok(BigPongMsg { i, data, sum })
1113                }
1114                PING_ID => Err(SerError::invalid_type(
1115                    "Found BigPingMsg, but expected BigPongMsg.",
1116                )),
1117                id => Err(SerError::invalid_type(format!(
1118                    "Found unknown id {}, but expected BigPingMsg.",
1119                    id
1120                ))),
1121            }
1122        }
1123    }
1124
1125    /// An actor which will send `BigPingMsg` to `target` over the network and counts the number of
1126    /// `BigPongMsg` replies it receives.
1127    ///
1128    /// The Actors can be configured with different data-sizes and fills each message with
1129    /// pseudo-random data and a checksum. When receiving a message the actors verify the checksum.
1130    ///
1131    /// Target should be a [BigPongerAct](BigPongerAct).
1132    #[derive(ComponentDefinition)]
1133    pub struct BigPingerAct {
1134        ctx: ComponentContext<BigPingerAct>,
1135        target: ActorPath,
1136        /// The number of `BigPongMsg` this actor has received
1137        pub count: u64,
1138        data_size: usize,
1139        eager: bool,
1140        buffer_config: BufferConfig,
1141        pre_serialised: Option<VecDeque<ChunkRef>>,
1142        promise: Option<KPromise<()>>,
1143    }
1144
1145    #[allow(dead_code)]
1146    impl BigPingerAct {
1147        /// Creates a `BigPingerAct` which will send messages using
1148        /// [lazy serialisation](crate::actors::ActorPath#tell)
1149        ///
1150        /// `target` should be a [BigPongerAct]() and `data_size` will determine the size of the
1151        /// u8 array generated and embedded in each `BigPingMsg`.
1152        pub fn new_lazy(target: ActorPath, data_size: usize) -> BigPingerAct {
1153            BigPingerAct {
1154                ctx: ComponentContext::uninitialised(),
1155                target,
1156                count: 0,
1157                data_size,
1158                eager: false,
1159                buffer_config: BufferConfig::default(),
1160                pre_serialised: None,
1161                promise: None,
1162            }
1163        }
1164
1165        /// Creates a `BigPingerAct` which will send messages using
1166        /// [eager serialisation](crate::actors::ActorPath#tell_serialised).
1167        pub fn new_eager(
1168            target: ActorPath,
1169            data_size: usize,
1170            buffer_config: BufferConfig,
1171        ) -> BigPingerAct {
1172            BigPingerAct {
1173                ctx: ComponentContext::uninitialised(),
1174                target,
1175                count: 0,
1176                data_size,
1177                eager: true,
1178                buffer_config,
1179                pre_serialised: None,
1180                promise: None,
1181            }
1182        }
1183
1184        /// Creates a `BigPingerAct` which will send messages using
1185        /// [Preserialisation](crate::actors::ActorPath#tell_preserialised).
1186        pub fn new_preserialised(
1187            target: ActorPath,
1188            data_size: usize,
1189            buffer_config: BufferConfig,
1190        ) -> BigPingerAct {
1191            let pre_serialised = VecDeque::with_capacity(PING_COUNT as usize);
1192            BigPingerAct {
1193                ctx: ComponentContext::uninitialised(),
1194                target,
1195                count: 0,
1196                data_size,
1197                eager: true,
1198                buffer_config,
1199                pre_serialised: Some(pre_serialised),
1200                promise: None,
1201            }
1202        }
1203
1204        /// Creates a future which will be completed by the Pinger when it has received pongs for
1205        /// all the pings it sent
1206        pub fn completion_future(&mut self) -> KFuture<()> {
1207            let (promise, future) = promise();
1208            self.promise = Some(promise);
1209            future
1210        }
1211    }
1212
1213    impl ComponentLifecycle for BigPingerAct {
1214        fn on_start(&mut self) -> HandlerResult {
1215            let ping = BigPingMsg::new(0, self.data_size);
1216            if self.eager {
1217                self.ctx
1218                    .init_buffers(Some(self.buffer_config.clone()), None);
1219            }
1220            debug!(self.ctx.log(), "Starting, sending first ping: {:?}", ping);
1221            if self.eager {
1222                if let Some(msgs) = &mut self.pre_serialised {
1223                    for i in 0..PING_COUNT {
1224                        msgs.push_back(
1225                            self.ctx
1226                                .preserialise(&BigPingMsg::new(i, self.data_size))
1227                                .expect("serialise"),
1228                        );
1229                    }
1230                    self.target
1231                        .tell_preserialised(msgs.pop_front().expect("popping"), self)
1232                        .expect("telling");
1233                } else {
1234                    self.target.tell_serialised(ping, self).expect("serialise");
1235                }
1236            } else {
1237                self.target.tell(ping, self);
1238            }
1239            Handled::OK
1240        }
1241    }
1242
1243    impl Actor for BigPingerAct {
1244        type Message = Never;
1245
1246        fn receive_local(&mut self, _pong: Self::Message) -> HandlerResult {
1247            unimplemented!()
1248        }
1249
1250        fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
1251            match msg.try_deserialise::<BigPongMsg, BigPingPongSer>() {
1252                Ok(pong) => {
1253                    debug!(self.ctx.log(), "Got msg {:?}", pong);
1254                    pong.validate();
1255                    assert_eq!(
1256                        pong.data.len(),
1257                        self.data_size,
1258                        "Incorrect Pong data-length"
1259                    );
1260                    assert_eq!(pong.i, self.count % PING_COUNT, "Incorrect index of pong!");
1261                    self.count += 1;
1262                    if !self.count.is_multiple_of(PING_COUNT) {
1263                        if self.eager {
1264                            if let Some(msgs) = &mut self.pre_serialised {
1265                                self.target
1266                                    .tell_preserialised(msgs.pop_front().expect("popping"), self)
1267                                    .expect("telling");
1268                            } else {
1269                                let ping = BigPingMsg::new(pong.i + 1, self.data_size);
1270                                self.target.tell_serialised(ping, self).expect("serialise");
1271                            }
1272                        } else {
1273                            let ping = BigPingMsg::new(pong.i + 1, self.data_size);
1274                            self.target.tell(ping, self);
1275                        }
1276                    } else if let Some(promise) = self.promise.take() {
1277                        promise.complete().expect("Failed to fulfil promise");
1278                    }
1279                }
1280                Err(e) => error!(self.ctx.log(), "Error deserialising BigPongMsg: {:?}", e),
1281            }
1282            Handled::OK
1283        }
1284    }
1285
1286    /// An actor which will reply to `BigPingMsg` over the network with `BigPongMsg`
1287    ///
1288    /// Verifies the checksum of each `BigPingMsg` and replies with an equally sized `BigPongMsg`.
1289    #[derive(ComponentDefinition)]
1290    pub struct BigPongerAct {
1291        ctx: ComponentContext<BigPongerAct>,
1292        eager: bool,
1293        buffer_config: BufferConfig,
1294    }
1295
1296    #[allow(dead_code)]
1297    impl BigPongerAct {
1298        /// Creates a `BigPongerAct` which will send messages using
1299        /// [Lazy Serialisation](crate::actors::ActorPath#tell)
1300        pub fn new_lazy() -> BigPongerAct {
1301            BigPongerAct {
1302                ctx: ComponentContext::uninitialised(),
1303                eager: false,
1304                buffer_config: BufferConfig::default(),
1305            }
1306        }
1307
1308        /// Creates a `BigPingerAct` which will send messages using
1309        /// [Eager Serialisation](crate::actors::ActorPath#tell_serialised)
1310        pub fn new_eager(buffer_config: BufferConfig) -> BigPongerAct {
1311            BigPongerAct {
1312                ctx: ComponentContext::uninitialised(),
1313                eager: true,
1314                buffer_config,
1315            }
1316        }
1317    }
1318
1319    impl ComponentLifecycle for BigPongerAct {
1320        fn on_start(&mut self) -> HandlerResult {
1321            if self.eager {
1322                self.ctx
1323                    .init_buffers(Some(self.buffer_config.clone()), None);
1324            }
1325            Handled::OK
1326        }
1327    }
1328
1329    impl Actor for BigPongerAct {
1330        type Message = Never;
1331
1332        fn receive_local(&mut self, _ping: Self::Message) -> HandlerResult {
1333            unimplemented!()
1334        }
1335
1336        fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
1337            let sender = msg.sender;
1338            match_deser! {
1339                (msg.data) {
1340                    msg(ping): BigPingMsg [using BigPingPongSer] => {
1341                        debug!(self.ctx.log(), "Got msg {:?} from {}", ping, sender);
1342                        ping.validate();
1343                        let pong = BigPongMsg::new(ping.i, ping.data.len());
1344                        if self.eager {
1345                            sender
1346                                .tell_serialised(pong, self)
1347                                .expect("BigPongMsg should serialise");
1348                        } else {
1349                            sender.tell(pong, self);
1350                        }
1351                    },
1352                    err(e) => error!(self.ctx.log(), "Error deserialising BigPingMsg: {:?}", e),
1353                }
1354            }
1355            Handled::OK
1356        }
1357    }
1358
1359    /// Actor which can subscribe to the `NetworkStatusPort` and maintains a counter of how many
1360    /// of each kind of NetworkStatusUpdate has been received.
1361    #[derive(ComponentDefinition)]
1362    pub struct NetworkStatusCounter {
1363        ctx: ComponentContext<NetworkStatusCounter>,
1364        /// The NetworkStatusPort, needs to be exposed to allow connecting to it
1365        pub network_status_port: RequiredPort<NetworkStatusPort>,
1366        /// Counts the number of connection_established messages received
1367        pub connection_established: u32,
1368        /// Counts the number of connection_lost messages received
1369        pub connection_lost: u32,
1370        /// Counts the number of connection_dropped messages received
1371        pub connection_dropped: u32,
1372        /// Counts the number of connection_closed messages received
1373        pub connection_closed: u32,
1374        /// Contains all `SystemPath`'s received in a `ConnectionEstablished`
1375        pub connected_systems: Vec<(SystemPath, SessionId)>,
1376        /// Contains all `SystemPath`'s received in a `ConnectionLost` or `ConnectionClosed` message
1377        pub disconnected_systems: Vec<(SystemPath, SessionId)>,
1378        /// Counts the number of `SoftConnectionLimitExceeded` messages received
1379        pub soft_connection_limit_exceeded: u32,
1380        /// Counts the number of `HardConnectionLimitReached` messages received
1381        pub hard_connection_limit_reached: u32,
1382        /// Counts the number of network_out_of_buffers messages received
1383        pub network_out_of_buffers: u32,
1384        /// Counts the number of `CriticalNetworkFailure` messages received
1385        pub critical_network_failure: u32,
1386        network_status_queue_sender: Option<Sender<NetworkStatus>>,
1387        started_promise: Option<KPromise<()>>,
1388    }
1389
1390    impl NetworkStatusCounter {
1391        /// Creates a new uninitialised NetworkStatusCounter with all counters set to 0
1392        pub fn new() -> NetworkStatusCounter {
1393            Self {
1394                ctx: ComponentContext::uninitialised(),
1395                network_status_port: RequiredPort::<NetworkStatusPort>::uninitialised(),
1396                connection_established: 0,
1397                connection_lost: 0,
1398                connection_dropped: 0,
1399                connection_closed: 0,
1400                connected_systems: Vec::new(),
1401                disconnected_systems: Vec::new(),
1402                soft_connection_limit_exceeded: 0,
1403                hard_connection_limit_reached: 0,
1404                network_out_of_buffers: 0,
1405                critical_network_failure: 0,
1406                network_status_queue_sender: None,
1407                started_promise: None,
1408            }
1409        }
1410
1411        /// Sets a sender which the NetworkStatusCounter will foward all the NetworkStatus events to
1412        pub fn set_status_sender(&mut self, sender: Sender<NetworkStatus>) {
1413            self.network_status_queue_sender = Some(sender);
1414        }
1415
1416        /// triggers the given `request` on the NetworkStatusPort
1417        pub fn send_status_request(&mut self, request: NetworkStatusRequest) {
1418            debug!(self.ctx.log(), "Sending Status Request. {:?}", request);
1419            self.network_status_port.trigger(request);
1420        }
1421
1422        /// Creates a future that will be fulfilled when the component starts
1423        pub fn started_future(&mut self) -> KFuture<()> {
1424            let (promise, future) = promise();
1425            self.started_promise = Some(promise);
1426            future
1427        }
1428
1429        /// Creates a faulty Message which will cause the local NetworkThread to panic
1430        /// when it tries to send it to the remote host.
1431        ///
1432        /// Fails to corrupt the network if there are no connected systems to use as receiver
1433        /// for the corrupt message and returns an error instead.
1434        pub fn corrupt_network(&mut self) -> Result<(), SerError> {
1435            if let Some((receiving_system, _)) = self
1436                .connected_systems
1437                .iter()
1438                .find(|connected_system| !self.disconnected_systems.contains(connected_system))
1439            {
1440                warn!(self.ctx.log(), "Corrupting the local Network layer");
1441                let receiver = ActorPath::from((receiving_system.clone(), Uuid::new_v4()));
1442                let mut corrupted_chunk = self.ctx.preserialise(&PongMsg { i: 1 }).unwrap();
1443                corrupted_chunk.advance(50);
1444
1445                let corrupted_message =
1446                    DispatchData::Serialised(SerialisedFrame::ChunkRef(corrupted_chunk));
1447
1448                self.ctx
1449                    .system()
1450                    .dispatcher_ref()
1451                    .tell(DispatchEnvelope::Msg {
1452                        src: self.ctx.actor_path(),
1453                        dst: receiver,
1454                        msg: corrupted_message,
1455                    });
1456
1457                Ok(())
1458            } else {
1459                Err(SerError::unknown("No Valid Receivers"))
1460            }
1461        }
1462    }
1463
1464    impl Default for NetworkStatusCounter {
1465        fn default() -> Self {
1466            Self::new()
1467        }
1468    }
1469
1470    impl ComponentLifecycle for NetworkStatusCounter {
1471        fn on_start(&mut self) -> HandlerResult {
1472            if let Some(promise) = self.started_promise.take() {
1473                promise.complete().expect("Failed to fulfil promise");
1474            }
1475            Handled::OK
1476        }
1477
1478        fn on_stop(&mut self) -> HandlerResult {
1479            Handled::OK
1480        }
1481
1482        fn on_kill(&mut self) -> HandlerResult {
1483            Handled::OK
1484        }
1485    }
1486
1487    impl Actor for NetworkStatusCounter {
1488        type Message = ();
1489
1490        fn receive_local(&mut self, _msg: Self::Message) -> HandlerResult {
1491            unimplemented!()
1492        }
1493
1494        fn receive_network(&mut self, _msg: NetMessage) -> HandlerResult {
1495            unimplemented!("Ignoring network messages.")
1496        }
1497    }
1498
1499    impl Require<NetworkStatusPort> for NetworkStatusCounter {
1500        fn handle(&mut self, event: <NetworkStatusPort as Port>::Indication) -> HandlerResult {
1501            debug!(
1502                self.ctx.log(),
1503                "Got NetworkStatusPort indication. {:?}", event
1504            );
1505            if let Some(sender) = &self.network_status_queue_sender {
1506                sender
1507                    .send(event.clone())
1508                    .expect("StatusCounter to send NetworkStatus");
1509            }
1510            match event {
1511                NetworkStatus::ConnectionEstablished(system_path, session) => {
1512                    self.connection_established += 1;
1513                    self.connected_systems.push((system_path, session));
1514                }
1515                NetworkStatus::ConnectionLost(system_path, session) => {
1516                    self.connection_lost += 1;
1517                    self.disconnected_systems.push((system_path, session));
1518                }
1519                NetworkStatus::ConnectionDropped(_) => {
1520                    self.connection_dropped += 1;
1521                }
1522                NetworkStatus::ConnectionClosed(system_path, session) => {
1523                    self.connection_closed += 1;
1524                    self.disconnected_systems.push((system_path, session));
1525                }
1526                NetworkStatus::SoftConnectionLimitExceeded => {
1527                    self.soft_connection_limit_exceeded += 1
1528                }
1529                NetworkStatus::HardConnectionLimitReached => {
1530                    self.hard_connection_limit_reached += 1
1531                }
1532                NetworkStatus::CriticalNetworkFailure => {
1533                    self.critical_network_failure += 1;
1534                }
1535                NetworkStatus::BlockedIpNet(_) => (),
1536                NetworkStatus::AllowedIpNet(_) => (),
1537                NetworkStatus::AllowedIp(_) => (),
1538                NetworkStatus::BlockedSystem(_) => (),
1539                NetworkStatus::BlockedIp(_) => (),
1540                NetworkStatus::AllowedSystem(_) => (),
1541            }
1542            Handled::OK
1543        }
1544    }
1545
1546    /// An actor that continuously sends `PingMsg` to a `target` over the network.
1547    /// Target should be a [PongerAct](PongerAct).
1548    #[derive(ComponentDefinition)]
1549    pub struct PingStream {
1550        ctx: ComponentContext<PingStream>,
1551        target: ActorPath,
1552        period: Duration,
1553        timer: Option<ScheduledTimer>,
1554        /// Sent Ping messages
1555        pub ping_count: u64,
1556        /// Received Pong messages
1557        pub pong_count: u64,
1558    }
1559
1560    impl PingStream {
1561        /// creates a `PingStream` actor that sends `PingMsg` to `target` every `period`
1562        /// Target should be a [PongerAct](PongerAct).
1563        pub fn new(target: ActorPath, period: Duration) -> Self {
1564            Self {
1565                ctx: ComponentContext::uninitialised(),
1566                target,
1567                period,
1568                timer: None,
1569                ping_count: 0,
1570                pong_count: 0,
1571            }
1572        }
1573
1574        /// method to start pinging
1575        pub fn start_pinging(&mut self) {
1576            let timer =
1577                self.schedule_periodic(Duration::from_millis(0), self.period, move |p, _| {
1578                    p.ping();
1579                    Handled::OK
1580                });
1581            self.timer = Some(timer);
1582        }
1583
1584        /// method to stop pinging
1585        pub fn stop_pinging(&mut self) {
1586            let timer = self.timer.take().expect("No timer");
1587            self.cancel_timer(timer);
1588        }
1589
1590        fn ping(&mut self) {
1591            self.ping_count += 1;
1592            self.target
1593                .tell_serialised(PingMsg { i: self.ping_count }, self)
1594                .expect("serialise");
1595        }
1596    }
1597
1598    impl ComponentLifecycle for PingStream {
1599        fn on_start(&mut self) -> HandlerResult {
1600            debug!(self.ctx.log(), "Starting");
1601            self.start_pinging();
1602            Handled::OK
1603        }
1604    }
1605
1606    impl Actor for PingStream {
1607        type Message = Never;
1608
1609        fn receive_local(&mut self, _: Self::Message) -> HandlerResult {
1610            unimplemented!()
1611        }
1612
1613        fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
1614            match msg.try_deserialise::<PongMsg, PingPongSer>() {
1615                Ok(pong) => {
1616                    debug!(self.ctx.log(), "Got msg {:?}", pong);
1617                    self.pong_count += 1;
1618                }
1619                Err(e) => error!(self.ctx.log(), "Error deserialising PongMsg: {:?}", e),
1620            }
1621            Handled::OK
1622        }
1623    }
1624}