1use super::*;
2use actors::Transport;
3use arc_swap::ArcSwap;
4use dispatch::lookup::ActorStore;
5
6use crate::{
7 NetworkStatus,
8 dispatch::NetworkConfig,
9 events::NetworkDispatcherEvent,
10 messaging::{DispatchData, DispatchEnvelope},
11 net::{SessionId, events::DispatchEvent, frames::*, network_thread::NetworkThreadBuilder},
12};
13use crossbeam_channel::{RecvError, SendError, Sender, unbounded as channel};
14use ipnet::IpNet;
15use kompact::runtime::{BacktraceSuffix, SourceCause};
16use mio::{Interest, Waker};
17use snafu::{IntoError, ResultExt};
18pub use std::net::SocketAddr;
19use std::{io, net::IpAddr, panic, sync::Arc, thread, time::Duration};
20
21pub(crate) mod network_channel;
22pub mod network_thread;
23pub(crate) mod udp_state;
24
25#[derive(Clone, Debug)]
27pub enum ConnectionState {
28 New,
30 Initializing,
32 Connected(SessionId),
34 Closed(SessionId),
36 Lost(SessionId),
38 Blocked,
40 }
43
44pub enum Protocol {
45 Tcp,
46 Udp,
47}
48impl From<Transport> for Protocol {
49 fn from(t: Transport) -> Self {
50 match t {
51 Transport::Tcp => Protocol::Tcp,
52 Transport::Udp => Protocol::Udp,
53 _ => unimplemented!("Unsupported Protocol"),
54 }
55 }
56}
57
58pub mod events {
60
61 use crate::{messaging::DispatchData, net::SocketAddr};
62 use ipnet::IpNet;
63 use std::net::IpAddr;
64
65 #[derive(Debug)]
67 pub enum DispatchEvent {
68 SendTcp(SocketAddr, DispatchData),
70 SendUdp(SocketAddr, DispatchData),
72 Stop,
74 Kill,
76 Connect(SocketAddr),
78 ClosedAck(SocketAddr),
80 Close(SocketAddr),
82 BlockSocket(SocketAddr),
84 BlockIpAddr(IpAddr),
86 BlockIpNet(IpNet),
88 AllowSocket(SocketAddr),
90 AllowIpAddr(IpAddr),
92 AllowIpNet(IpNet),
94 }
95}
96
97#[allow(dead_code)]
99pub struct BridgeConfig {
100 retry_strategy: RetryStrategy,
101}
102
103impl BridgeConfig {
104 #[allow(dead_code)]
108 pub fn new() -> Self {
109 BridgeConfig::default()
110 }
111}
112
113#[allow(dead_code)]
114enum RetryStrategy {
115 ExponentialBackoff { base_ms: u64, num_tries: usize },
116}
117
118#[allow(dead_code)]
119impl Default for BridgeConfig {
120 fn default() -> Self {
121 let retry_strategy = RetryStrategy::ExponentialBackoff {
122 base_ms: 100,
123 num_tries: 5,
124 };
125 BridgeConfig { retry_strategy }
126 }
127}
128
129pub struct Bridge {
131 log: KompactLogger,
135 network_input_queue: Sender<events::DispatchEvent>,
141 waker: Waker,
142 dispatcher: Option<DispatcherRef>,
146 bound_address: Option<SocketAddr>,
148 shutdown_future: KFuture<()>,
149}
150
151impl Bridge {
152 pub fn new(
158 lookup: Arc<ArcSwap<ActorStore>>,
159 network_thread_log: KompactLogger,
160 bridge_log: KompactLogger,
161 addr: SocketAddr,
162 dispatcher_ref: DispatcherRef,
163 network_config: &NetworkConfig,
164 ) -> (Self, SocketAddr) {
165 let (sender, receiver) = channel();
166 let (shutdown_p, shutdown_f) = promise();
167 match NetworkThreadBuilder::new(
168 network_thread_log,
169 addr,
170 lookup,
171 receiver,
172 shutdown_p,
173 dispatcher_ref.clone(),
174 network_config.clone(),
175 ) {
176 Ok(mut network_thread_builder) => {
177 let bound_address = network_thread_builder.address;
178 let waker = network_thread_builder
179 .take_waker()
180 .expect("NetworkThread poll error");
181
182 let (started_p, started_f) = promise();
183 run_network_thread(
184 network_thread_builder,
185 bridge_log.clone(),
186 started_p,
187 dispatcher_ref.clone(),
188 )
189 .expect("Failed to spawn NetworkThread");
190 started_f
191 .wait_timeout(Duration::from_millis(network_config.get_boot_timeout()))
192 .expect("NetworkThread time-out during boot sequence");
193
194 let bridge = Bridge {
195 log: bridge_log,
197 network_input_queue: sender,
199 waker,
200 dispatcher: Some(dispatcher_ref),
201 bound_address: Some(bound_address),
202 shutdown_future: shutdown_f,
203 };
204
205 (bridge, bound_address)
206 }
207 Err(e) => {
208 panic!("Failed to build a Network Thread, error: {:?}", e);
209 }
210 }
211 }
212
213 fn send_and_wake(&self, event: DispatchEvent) -> Result<(), NetworkBridgeError> {
214 self.network_input_queue
215 .send(event)
216 .context(network_bridge_error::SendSnafu)?;
217 self.waker.wake().context(network_bridge_error::IoSnafu)?;
218 Ok(())
219 }
220
221 pub fn set_dispatcher(&mut self, dispatcher: DispatcherRef) -> Option<DispatcherRef> {
223 self.dispatcher.replace(dispatcher)
224 }
225
226 pub fn stop(self) -> Result<(), NetworkBridgeError> {
228 debug!(self.log, "Stopping NetworkBridge...");
229 self.send_and_wake(DispatchEvent::Stop)?;
230 self.shutdown_future.wait(); debug!(self.log, "Stopped NetworkBridge.");
232 Ok(())
233 }
234
235 pub fn kill(self) -> Result<(), NetworkBridgeError> {
237 debug!(self.log, "Killing NetworkBridge...");
238 self.send_and_wake(DispatchEvent::Kill)?;
239 self.shutdown_future.wait(); debug!(self.log, "Stopped NetworkBridge.");
241 Ok(())
242 }
243
244 pub fn local_addr(&self) -> &Option<SocketAddr> {
246 &self.bound_address
247 }
248
249 pub(crate) fn route(
251 &self,
252 addr: SocketAddr,
253 data: DispatchData,
254 protocol: Protocol,
255 ) -> Result<(), NetworkBridgeError> {
256 let event = match protocol {
257 Protocol::Tcp => DispatchEvent::SendTcp(addr, data),
258 Protocol::Udp => DispatchEvent::SendUdp(addr, data),
259 };
260 self.send_and_wake(event)
261 }
262
263 pub fn connect(&self, proto: Transport, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
273 match proto {
274 Transport::Tcp => self.send_and_wake(DispatchEvent::Connect(addr)),
275 other => Err(NetworkBridgeError::unsupported_protocol(other)),
276 }
277 }
278
279 pub fn ack_closed(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
281 self.send_and_wake(DispatchEvent::ClosedAck(addr))
282 }
283
284 pub fn close_channel(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
286 self.send_and_wake(DispatchEvent::Close(addr))
287 }
288
289 pub fn block_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
291 self.send_and_wake(DispatchEvent::BlockSocket(addr))
292 }
293
294 pub fn block_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeError> {
296 self.send_and_wake(DispatchEvent::BlockIpAddr(ip_addr))
297 }
298
299 pub fn block_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeError> {
301 self.send_and_wake(DispatchEvent::BlockIpNet(ip_net))
302 }
303
304 pub fn allow_socket(&self, addr: SocketAddr) -> Result<(), NetworkBridgeError> {
306 self.send_and_wake(DispatchEvent::AllowSocket(addr))
307 }
308
309 pub fn allow_ip(&self, ip_addr: IpAddr) -> Result<(), NetworkBridgeError> {
311 self.send_and_wake(DispatchEvent::AllowIpAddr(ip_addr))
312 }
313
314 pub fn allow_ip_net(&self, ip_net: IpNet) -> Result<(), NetworkBridgeError> {
316 self.send_and_wake(DispatchEvent::AllowIpNet(ip_net))
317 }
318}
319
320fn run_network_thread(
321 builder: NetworkThreadBuilder,
322 logger: KompactLogger,
323 started_promise: KPromise<()>,
324 dispatcher_ref: DispatcherRef,
325) -> async_std::io::Result<()> {
326 thread::Builder::new()
327 .name("network_thread".to_string())
328 .spawn(move || {
329 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
330 let network_thread = builder.build();
331 started_promise
332 .complete()
333 .expect("NetworkThread started but failed to fulfil promise");
334 network_thread.run()
335 })) {
336 if let Some(error_msg) = e.downcast_ref::<&str>() {
337 error!(logger, "NetworkThread panicked with: {:?}", &error_msg);
338 } else if let Some(error_msg) = e.downcast_ref::<String>() {
339 error!(logger, "NetworkThread panicked with: {:?}", error_msg);
340 } else {
341 error!(
342 logger,
343 "NetworkThread panicked with type id={:?}",
344 (*e).type_id()
345 );
346 }
347 dispatcher_ref.tell(DispatchEnvelope::Event(Box::new(
348 NetworkDispatcherEvent::Network(NetworkStatus::CriticalNetworkFailure),
349 )))
350 }
351 })
352 .map(|_| ())
353}
354
355#[derive(Debug, snafu::Snafu)]
357pub struct NetworkBridgeError(Box<NetworkBridgeErrorKind>);
358
359#[derive(Debug, snafu::Snafu)]
360#[snafu(module(network_bridge_error), visibility(pub(crate)))]
361enum NetworkBridgeErrorKind {
362 #[snafu(display(
363 "network bridge does not support protocol {protocol:?} ({location}){}",
364 BacktraceSuffix(backtrace.as_ref())
365 ))]
366 UnsupportedProtocol {
367 protocol: Transport,
368 #[snafu(implicit)]
369 location: snafu::Location,
370 backtrace: Option<snafu::Backtrace>,
371 },
372 #[snafu(display(
373 "failed to send network bridge event ({location}).{}",
374 SourceCause(source, None, backtrace.as_ref())
375 ))]
376 Send {
377 source: SendError<DispatchEvent>,
378 #[snafu(implicit)]
379 location: snafu::Location,
380 backtrace: Option<snafu::Backtrace>,
381 },
382 #[snafu(display(
383 "network bridge IO failed ({location}).{}",
384 SourceCause(source, None, backtrace.as_ref())
385 ))]
386 Io {
387 source: io::Error,
388 #[snafu(implicit)]
389 location: snafu::Location,
390 backtrace: Option<snafu::Backtrace>,
391 },
392 #[snafu(display(
393 "network bridge receive failed ({location}).{}",
394 SourceCause(source, None, backtrace.as_ref())
395 ))]
396 Receive {
397 source: RecvError,
398 #[snafu(implicit)]
399 location: snafu::Location,
400 backtrace: Option<snafu::Backtrace>,
401 },
402 #[snafu(display(
403 "network bridge serialisation failed ({location}).{}",
404 SourceCause(source, None, backtrace.as_ref())
405 ))]
406 Serialisation {
407 source: SerError,
408 #[snafu(implicit)]
409 location: snafu::Location,
410 backtrace: Option<snafu::Backtrace>,
411 },
412 #[snafu(display(
413 "network bridge failed: {message} ({location}){}",
414 BacktraceSuffix(backtrace.as_ref())
415 ))]
416 Message {
417 message: String,
418 #[snafu(implicit)]
419 location: snafu::Location,
420 backtrace: Option<snafu::Backtrace>,
421 },
422}
423
424impl NetworkBridgeError {
425 #[track_caller]
427 pub fn unsupported_protocol(protocol: Transport) -> Self {
428 network_bridge_error::UnsupportedProtocolSnafu { protocol }
429 .build()
430 .into()
431 }
432
433 #[track_caller]
435 pub fn message(message: impl Into<String>) -> Self {
436 network_bridge_error::MessageSnafu {
437 message: message.into(),
438 }
439 .build()
440 .into()
441 }
442}
443
444impl From<SendError<DispatchEvent>> for NetworkBridgeError {
445 #[track_caller]
446 fn from(source: SendError<DispatchEvent>) -> Self {
447 network_bridge_error::SendSnafu.into_error(source).into()
448 }
449}
450
451impl From<io::Error> for NetworkBridgeError {
452 #[track_caller]
453 fn from(source: io::Error) -> Self {
454 network_bridge_error::IoSnafu.into_error(source).into()
455 }
456}
457
458impl From<RecvError> for NetworkBridgeError {
459 #[track_caller]
460 fn from(source: RecvError) -> Self {
461 network_bridge_error::ReceiveSnafu.into_error(source).into()
462 }
463}
464
465impl From<SerError> for NetworkBridgeError {
466 #[track_caller]
467 fn from(source: SerError) -> Self {
468 network_bridge_error::SerialisationSnafu
469 .into_error(source)
470 .into()
471 }
472}
473
474#[cfg(test)]
475mod bridge_error_tests {
476 use super::*;
477 use kompact::test_support::assert_display_matches;
478 use std::error::Error;
479
480 #[test]
481 fn network_bridge_error_display_includes_source_and_location() {
482 let source = io::Error::other("waker failed");
483 let error = NetworkBridgeError::from(source);
484
485 assert_display_matches(
486 &error.to_string(),
487 "\
488network bridge IO failed {LOC}.
489 Caused by: waker failed.",
490 file!(),
491 );
492 assert_eq!(
493 "waker failed",
494 error.source().expect("io source").to_string()
495 );
496 }
497
498 #[test]
499 fn network_bridge_send_error_preserves_send_error_source() {
500 let (sender, receiver) = channel::<DispatchEvent>();
501 drop(receiver);
502 let source = sender
503 .send(DispatchEvent::Stop)
504 .expect_err("send should fail without receiver");
505 let error: NetworkBridgeError = network_bridge_error::SendSnafu.into_error(source).into();
506
507 assert_display_matches(
508 &error.to_string(),
509 "\
510failed to send network bridge event {LOC}.
511 Caused by: sending on a disconnected channel.",
512 file!(),
513 );
514 assert_eq!(
515 "sending on a disconnected channel",
516 error.source().expect("send source").to_string()
517 );
518 }
519}
520
521pub(crate) fn would_block(err: &io::Error) -> bool {
525 err.kind() == io::ErrorKind::WouldBlock
526}
527
528pub(crate) fn interrupted(err: &io::Error) -> bool {
529 err.kind() == io::ErrorKind::Interrupted
530}
531
532pub(crate) fn no_buffer_space(err: &io::Error) -> bool {
533 err.kind() == io::ErrorKind::InvalidInput
534}
535
536pub(crate) fn connection_reset(err: &io::Error) -> bool {
537 err.kind() == io::ErrorKind::ConnectionReset
538}
539
540pub(crate) fn broken_pipe(err: &io::Error) -> bool {
541 err.kind() == io::ErrorKind::BrokenPipe
542}
543
544pub(crate) fn out_of_buffers(err: &SerError) -> bool {
545 err.kind() == SerErrorKind::NoBuffersAvailable
546}
547
548pub mod net_test_helpers {
550 use crate::{
551 NetworkStatus,
552 NetworkStatusPort,
553 NetworkStatusRequest,
554 messaging::{DispatchData, SerialisedFrame},
555 prelude::*,
556 };
557 use crossbeam_channel::Sender;
558 use std::{
559 cmp::Ordering,
560 collections::VecDeque,
561 fmt::{Debug, Formatter},
562 time::{Duration, SystemTime, UNIX_EPOCH},
563 };
564 use uuid::Uuid;
565
566 pub const PING_COUNT: u64 = 10;
568
569 #[derive(Debug, Clone)]
570 struct PingMsg {
571 i: u64,
572 }
573
574 #[derive(Debug, Clone)]
575 struct PongMsg {
576 i: u64,
577 }
578
579 #[derive(Debug, Clone)]
580 struct PingPongSer;
581
582 impl PingPongSer {
583 const SID: SerId = 42;
584 }
585
586 const PING_ID: i8 = 1;
587 const PONG_ID: i8 = 2;
588
589 impl Serialiser<PingMsg> for PingPongSer {
590 fn ser_id(&self) -> SerId {
591 Self::SID
592 }
593
594 fn size_hint(&self) -> Option<usize> {
595 Some(9)
596 }
597
598 fn serialise(&self, v: &PingMsg, buf: &mut dyn BufMut) -> Result<(), SerError> {
599 buf.put_i8(PING_ID);
600 buf.put_u64(v.i);
601 Result::Ok(())
602 }
603 }
604
605 impl Serialisable for PingMsg {
606 fn ser_id(&self) -> SerId {
607 42
608 }
609
610 fn size_hint(&self) -> Option<usize> {
611 Some(9)
612 }
613
614 fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
615 buf.put_i8(PING_ID);
616 buf.put_u64(self.i);
617 Result::Ok(())
618 }
619
620 fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
621 Ok(self)
622 }
623 }
624
625 impl Serialisable for PongMsg {
626 fn ser_id(&self) -> SerId {
627 42
628 }
629
630 fn size_hint(&self) -> Option<usize> {
631 Some(9)
632 }
633
634 fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
635 buf.put_i8(PONG_ID);
636 buf.put_u64(self.i);
637 Result::Ok(())
638 }
639
640 fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
641 Ok(self)
642 }
643 }
644
645 impl Serialiser<PongMsg> for PingPongSer {
646 fn ser_id(&self) -> SerId {
647 Self::SID
648 }
649
650 fn size_hint(&self) -> Option<usize> {
651 Some(9)
652 }
653
654 fn serialise(&self, v: &PongMsg, buf: &mut dyn BufMut) -> Result<(), SerError> {
655 buf.put_i8(PONG_ID);
656 buf.put_u64(v.i);
657 Result::Ok(())
658 }
659 }
660
661 impl Deserialiser<PingMsg> for PingPongSer {
662 const SER_ID: SerId = Self::SID;
663
664 fn deserialise(buf: &mut dyn Buf) -> Result<PingMsg, SerError> {
665 if buf.remaining() < 9 {
666 return Err(SerError::invalid_data(format!(
667 "Serialised typed has 9bytes but only {}bytes remain in buffer.",
668 buf.remaining()
669 )));
670 }
671 match buf.get_i8() {
672 PING_ID => {
673 let i = buf.get_u64();
674 Ok(PingMsg { i })
675 }
676 PONG_ID => Err(SerError::invalid_type(
677 "Found PongMsg, but expected PingMsg.",
678 )),
679 id => Err(SerError::invalid_type(format!(
680 "Found unknown id {}, but expected PingMsg.",
681 id
682 ))),
683 }
684 }
685 }
686
687 impl Deserialiser<PongMsg> for PingPongSer {
688 const SER_ID: SerId = Self::SID;
689
690 fn deserialise(buf: &mut dyn Buf) -> Result<PongMsg, SerError> {
691 if buf.remaining() < 9 {
692 return Err(SerError::invalid_data(format!(
693 "Serialised typed has 9bytes but only {}bytes remain in buffer.",
694 buf.remaining()
695 )));
696 }
697 match buf.get_i8() {
698 PONG_ID => {
699 let i = buf.get_u64();
700 Ok(PongMsg { i })
701 }
702 PING_ID => Err(SerError::invalid_type(
703 "Found PingMsg, but expected PongMsg.",
704 )),
705 id => Err(SerError::invalid_type(format!(
706 "Found unknown id {}, but expected PingMsg.",
707 id
708 ))),
709 }
710 }
711 }
712
713 #[derive(ComponentDefinition)]
717 pub struct PingerAct {
718 ctx: ComponentContext<PingerAct>,
719 target: ActorPath,
720 pub count: u64,
722 eager: bool,
723 promise: Option<KPromise<()>>,
724 pub pong_system_session: Option<SessionId>,
726 }
727
728 impl PingerAct {
729 pub fn new_lazy(target: ActorPath) -> PingerAct {
732 PingerAct {
733 ctx: ComponentContext::uninitialised(),
734 target,
735 count: 0,
736 eager: false,
737 promise: None,
738 pong_system_session: None,
739 }
740 }
741
742 pub fn new_eager(target: ActorPath) -> PingerAct {
745 PingerAct {
746 ctx: ComponentContext::uninitialised(),
747 target,
748 count: 0,
749 eager: true,
750 promise: None,
751 pong_system_session: None,
752 }
753 }
754
755 pub fn completion_future(&mut self) -> KFuture<()> {
758 let (promise, future) = promise();
759 self.promise = Some(promise);
760 future
761 }
762 }
763
764 impl ComponentLifecycle for PingerAct {
765 fn on_start(&mut self) -> HandlerResult {
766 debug!(self.ctx.log(), "Starting");
767 if self.eager {
768 self.target
769 .tell_serialised(PingMsg { i: 0 }, self)
770 .expect("serialise");
771 } else {
772 self.target.tell(PingMsg { i: 0 }, self);
773 }
774 Handled::OK
775 }
776 }
777
778 impl Actor for PingerAct {
779 type Message = Never;
780
781 fn receive_local(&mut self, _pong: Self::Message) -> HandlerResult {
782 unimplemented!()
783 }
784
785 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
786 if self.pong_system_session.is_none() {
787 self.pong_system_session = msg.session();
788 }
789 match msg.try_deserialise::<PongMsg, PingPongSer>() {
790 Ok(pong) => {
791 debug!(self.ctx.log(), "Got msg {:?}", pong);
792 self.count += 1;
793 match self.count.cmp(&PING_COUNT) {
794 Ordering::Less if self.eager => {
795 self.target
796 .tell_serialised(PingMsg { i: pong.i + 1 }, self)
797 .expect("serialise");
798 }
799 Ordering::Less => {
800 self.target.tell(PingMsg { i: pong.i + 1 }, self);
801 }
802 Ordering::Equal if self.promise.is_some() => {
803 self.promise
804 .take()
805 .unwrap()
806 .complete()
807 .expect("Failed to fulfil promise");
808 }
809 _ => (), }
811 }
812 Err(e) => error!(self.ctx.log(), "Error deserialising PongMsg: {:?}", e),
813 }
814 Handled::OK
815 }
816 }
817
818 #[derive(ComponentDefinition)]
820 pub struct PongerAct {
821 ctx: ComponentContext<PongerAct>,
822 eager: bool,
823 pub count: u64,
825 }
826
827 impl PongerAct {
828 pub fn new_lazy() -> PongerAct {
831 PongerAct {
832 ctx: ComponentContext::uninitialised(),
833 eager: false,
834 count: 0,
835 }
836 }
837
838 pub fn new_eager() -> PongerAct {
841 PongerAct {
842 ctx: ComponentContext::uninitialised(),
843 eager: true,
844 count: 0,
845 }
846 }
847 }
848
849 ignore_lifecycle!(PongerAct);
850
851 impl Actor for PongerAct {
852 type Message = Never;
853
854 fn receive_local(&mut self, _ping: Self::Message) -> HandlerResult {
855 unimplemented!()
856 }
857
858 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
859 let sender = msg.sender;
860 match_deser! {
861 (msg.data) {
862 msg(ping): PingMsg [using PingPongSer] => {
863 debug!(self.ctx.log(), "Got msg {:?} from {}", ping, sender);
864 self.count += 1;
865 let pong = PongMsg { i: ping.i };
866 if self.eager {
867 sender
868 .tell_serialised(pong, self)
869 .expect("PongMsg should serialise");
870 } else {
871 sender.tell(pong, self);
872 }
873 },
874 err(e) => error!(self.ctx.log(), "Error deserialising PingMsg: {:?}", e),
875 }
876 }
877 Handled::OK
878 }
879 }
880
881 #[derive(ComponentDefinition)]
883 pub struct ForwarderAct {
884 ctx: ComponentContext<Self>,
885 forward_to: ActorPath,
886 }
887 impl ForwarderAct {
888 pub fn new(forward_to: ActorPath) -> Self {
890 ForwarderAct {
891 ctx: ComponentContext::uninitialised(),
892 forward_to,
893 }
894 }
895 }
896 ignore_lifecycle!(ForwarderAct);
897 impl Actor for ForwarderAct {
898 type Message = Never;
899
900 fn receive_local(&mut self, _ping: Self::Message) -> HandlerResult {
901 unimplemented!()
902 }
903
904 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
905 debug!(
906 self.ctx.log(),
907 "Forwarding some msg from {} to {}", msg.sender, self.forward_to
908 );
909 self.forward_to.forward_with_original_sender(msg, self);
910 Handled::OK
911 }
912 }
913
914 #[derive(Clone)]
915 struct BigPingMsg {
916 i: u64,
917 data: Vec<u8>,
918 sum: u64,
919 }
920
921 impl BigPingMsg {
922 fn new(i: u64, data_size: usize) -> Self {
923 let mut data = Vec::<u8>::with_capacity(data_size);
924 let mut sum: u64 = 0;
925 let seed = SystemTime::now()
927 .duration_since(UNIX_EPOCH)
928 .expect("Time went backwards")
929 .as_millis() as u64;
930 for _ in 0..data_size {
931 let d = (seed * (2 + i) % 128) as u8;
932 data.push(d);
933 sum += d as u64;
934 }
935 BigPingMsg { i, data, sum }
936 }
937
938 fn validate(&self) {
939 let mut sum: u64 = 0;
940 for d in &self.data {
941 sum += *d as u64;
942 }
943 assert_eq!(self.sum, sum, "BigPingMsg invalid, {:?}", self);
944 }
945 }
946
947 #[derive(Clone)]
948 struct BigPongMsg {
949 i: u64,
950 data: Vec<u8>,
951 sum: u64,
952 }
953
954 impl BigPongMsg {
955 fn new(i: u64, data_size: usize) -> BigPongMsg {
956 let mut data = Vec::<u8>::with_capacity(data_size);
957 let mut sum: u64 = 0;
958 let seed = SystemTime::now()
960 .duration_since(UNIX_EPOCH)
961 .expect("Time went backwards")
962 .as_millis() as u64;
963 for _ in 0..data_size {
964 let d = (seed * (2 + i) % 128) as u8;
965 data.push(d);
966 sum += d as u64;
967 }
968 BigPongMsg { i, data, sum }
969 }
970
971 fn validate(&self) {
972 let mut sum: u64 = 0;
973 for d in &self.data {
974 sum += *d as u64;
975 }
976 assert_eq!(self.sum, sum, "BigPongMsg invalid, {:?}", self);
977 }
978 }
979
980 impl Debug for BigPingMsg {
981 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
982 f.debug_struct("BigPingMsg")
983 .field("i", &self.i)
984 .field("data length", &self.data.len())
985 .field("sum", &self.sum)
986 .field("data 0", &self.data[0])
987 .field("data n", &self.data[&self.data.len() - 1])
988 .finish()
989 }
990 }
991 impl Debug for BigPongMsg {
992 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
993 f.debug_struct("BigPongMsg")
994 .field("i", &self.i)
995 .field("data length", &self.data.len())
996 .field("sum", &self.sum)
997 .field("data 0", &self.data[0])
998 .field("data n", &self.data[&self.data.len() - 1])
999 .finish()
1000 }
1001 }
1002
1003 struct BigPingPongSer;
1004
1005 impl BigPingPongSer {
1006 const SID: SerId = 43;
1007 }
1008
1009 impl Serialisable for BigPingMsg {
1010 fn ser_id(&self) -> SerId {
1011 43
1012 }
1013
1014 fn size_hint(&self) -> Option<usize> {
1015 Some(32 + self.data.len())
1016 }
1017
1018 fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
1019 buf.put_i8(PING_ID);
1020 buf.put_u64(self.i);
1021 buf.put_u64(self.data.len() as u64);
1022 for d in &self.data {
1023 buf.put_u8(*d);
1024 }
1025 buf.put_u64(self.sum);
1026 Result::Ok(())
1027 }
1028
1029 fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
1030 Ok(self)
1031 }
1032 }
1033
1034 impl Serialisable for BigPongMsg {
1035 fn ser_id(&self) -> SerId {
1036 43
1037 }
1038
1039 fn size_hint(&self) -> Option<usize> {
1040 Some(32 + self.data.len())
1041 }
1042
1043 fn serialise(&self, buf: &mut dyn BufMut) -> Result<(), SerError> {
1044 buf.put_i8(PONG_ID);
1045 buf.put_u64(self.i);
1046 buf.put_u64(self.data.len() as u64);
1047 for d in &self.data {
1048 buf.put_u8(*d);
1049 }
1050 buf.put_u64(self.sum);
1051 Result::Ok(())
1052 }
1053
1054 fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
1055 Ok(self)
1056 }
1057 }
1058
1059 impl Deserialiser<BigPingMsg> for BigPingPongSer {
1060 const SER_ID: SerId = Self::SID;
1061
1062 fn deserialise(buf: &mut dyn Buf) -> Result<BigPingMsg, SerError> {
1063 if buf.remaining() < 18 {
1064 return Err(SerError::invalid_data(format!(
1065 "Serialised typed has 18 bytes but only {} bytes remain in buffer.",
1066 buf.remaining()
1067 )));
1068 }
1069 match buf.get_i8() {
1070 PING_ID => {
1071 let i = buf.get_u64();
1072 let data_len = buf.get_u64() as usize;
1073 let mut data = Vec::with_capacity(data_len);
1074 for j in 0..data_len {
1075 data.insert(j, buf.get_u8());
1076 }
1077 let sum = buf.get_u64();
1078 assert_eq!(buf.remaining(), 0, "Buffer too big! {}", buf.remaining());
1079 Ok(BigPingMsg { i, data, sum })
1080 }
1081 PONG_ID => Err(SerError::invalid_type(
1082 "Found BigPongMsg, but expected BigPingMsg.",
1083 )),
1084 id => Err(SerError::invalid_type(format!(
1085 "Found unknown id {}, but expected BigPingMsg.",
1086 id
1087 ))),
1088 }
1089 }
1090 }
1091
1092 impl Deserialiser<BigPongMsg> for BigPingPongSer {
1093 const SER_ID: SerId = Self::SID;
1094
1095 fn deserialise(buf: &mut dyn Buf) -> Result<BigPongMsg, SerError> {
1096 if buf.remaining() < 18 {
1097 return Err(SerError::invalid_data(format!(
1098 "Serialised typed has 18 bytes but only {} bytes remain in buffer.",
1099 buf.remaining()
1100 )));
1101 }
1102 match buf.get_i8() {
1103 PONG_ID => {
1104 let i = buf.get_u64();
1105 let data_len = buf.get_u64() as usize;
1106 let mut data = Vec::with_capacity(data_len);
1107 for j in 0..data_len {
1108 data.insert(j, buf.get_u8());
1109 }
1110 let sum = buf.get_u64();
1111 assert_eq!(buf.remaining(), 0, "Buffer too big!");
1112 Ok(BigPongMsg { i, data, sum })
1113 }
1114 PING_ID => Err(SerError::invalid_type(
1115 "Found BigPingMsg, but expected BigPongMsg.",
1116 )),
1117 id => Err(SerError::invalid_type(format!(
1118 "Found unknown id {}, but expected BigPingMsg.",
1119 id
1120 ))),
1121 }
1122 }
1123 }
1124
1125 #[derive(ComponentDefinition)]
1133 pub struct BigPingerAct {
1134 ctx: ComponentContext<BigPingerAct>,
1135 target: ActorPath,
1136 pub count: u64,
1138 data_size: usize,
1139 eager: bool,
1140 buffer_config: BufferConfig,
1141 pre_serialised: Option<VecDeque<ChunkRef>>,
1142 promise: Option<KPromise<()>>,
1143 }
1144
1145 #[allow(dead_code)]
1146 impl BigPingerAct {
1147 pub fn new_lazy(target: ActorPath, data_size: usize) -> BigPingerAct {
1153 BigPingerAct {
1154 ctx: ComponentContext::uninitialised(),
1155 target,
1156 count: 0,
1157 data_size,
1158 eager: false,
1159 buffer_config: BufferConfig::default(),
1160 pre_serialised: None,
1161 promise: None,
1162 }
1163 }
1164
1165 pub fn new_eager(
1168 target: ActorPath,
1169 data_size: usize,
1170 buffer_config: BufferConfig,
1171 ) -> BigPingerAct {
1172 BigPingerAct {
1173 ctx: ComponentContext::uninitialised(),
1174 target,
1175 count: 0,
1176 data_size,
1177 eager: true,
1178 buffer_config,
1179 pre_serialised: None,
1180 promise: None,
1181 }
1182 }
1183
1184 pub fn new_preserialised(
1187 target: ActorPath,
1188 data_size: usize,
1189 buffer_config: BufferConfig,
1190 ) -> BigPingerAct {
1191 let pre_serialised = VecDeque::with_capacity(PING_COUNT as usize);
1192 BigPingerAct {
1193 ctx: ComponentContext::uninitialised(),
1194 target,
1195 count: 0,
1196 data_size,
1197 eager: true,
1198 buffer_config,
1199 pre_serialised: Some(pre_serialised),
1200 promise: None,
1201 }
1202 }
1203
1204 pub fn completion_future(&mut self) -> KFuture<()> {
1207 let (promise, future) = promise();
1208 self.promise = Some(promise);
1209 future
1210 }
1211 }
1212
1213 impl ComponentLifecycle for BigPingerAct {
1214 fn on_start(&mut self) -> HandlerResult {
1215 let ping = BigPingMsg::new(0, self.data_size);
1216 if self.eager {
1217 self.ctx
1218 .init_buffers(Some(self.buffer_config.clone()), None);
1219 }
1220 debug!(self.ctx.log(), "Starting, sending first ping: {:?}", ping);
1221 if self.eager {
1222 if let Some(msgs) = &mut self.pre_serialised {
1223 for i in 0..PING_COUNT {
1224 msgs.push_back(
1225 self.ctx
1226 .preserialise(&BigPingMsg::new(i, self.data_size))
1227 .expect("serialise"),
1228 );
1229 }
1230 self.target
1231 .tell_preserialised(msgs.pop_front().expect("popping"), self)
1232 .expect("telling");
1233 } else {
1234 self.target.tell_serialised(ping, self).expect("serialise");
1235 }
1236 } else {
1237 self.target.tell(ping, self);
1238 }
1239 Handled::OK
1240 }
1241 }
1242
1243 impl Actor for BigPingerAct {
1244 type Message = Never;
1245
1246 fn receive_local(&mut self, _pong: Self::Message) -> HandlerResult {
1247 unimplemented!()
1248 }
1249
1250 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
1251 match msg.try_deserialise::<BigPongMsg, BigPingPongSer>() {
1252 Ok(pong) => {
1253 debug!(self.ctx.log(), "Got msg {:?}", pong);
1254 pong.validate();
1255 assert_eq!(
1256 pong.data.len(),
1257 self.data_size,
1258 "Incorrect Pong data-length"
1259 );
1260 assert_eq!(pong.i, self.count % PING_COUNT, "Incorrect index of pong!");
1261 self.count += 1;
1262 if !self.count.is_multiple_of(PING_COUNT) {
1263 if self.eager {
1264 if let Some(msgs) = &mut self.pre_serialised {
1265 self.target
1266 .tell_preserialised(msgs.pop_front().expect("popping"), self)
1267 .expect("telling");
1268 } else {
1269 let ping = BigPingMsg::new(pong.i + 1, self.data_size);
1270 self.target.tell_serialised(ping, self).expect("serialise");
1271 }
1272 } else {
1273 let ping = BigPingMsg::new(pong.i + 1, self.data_size);
1274 self.target.tell(ping, self);
1275 }
1276 } else if let Some(promise) = self.promise.take() {
1277 promise.complete().expect("Failed to fulfil promise");
1278 }
1279 }
1280 Err(e) => error!(self.ctx.log(), "Error deserialising BigPongMsg: {:?}", e),
1281 }
1282 Handled::OK
1283 }
1284 }
1285
1286 #[derive(ComponentDefinition)]
1290 pub struct BigPongerAct {
1291 ctx: ComponentContext<BigPongerAct>,
1292 eager: bool,
1293 buffer_config: BufferConfig,
1294 }
1295
1296 #[allow(dead_code)]
1297 impl BigPongerAct {
1298 pub fn new_lazy() -> BigPongerAct {
1301 BigPongerAct {
1302 ctx: ComponentContext::uninitialised(),
1303 eager: false,
1304 buffer_config: BufferConfig::default(),
1305 }
1306 }
1307
1308 pub fn new_eager(buffer_config: BufferConfig) -> BigPongerAct {
1311 BigPongerAct {
1312 ctx: ComponentContext::uninitialised(),
1313 eager: true,
1314 buffer_config,
1315 }
1316 }
1317 }
1318
1319 impl ComponentLifecycle for BigPongerAct {
1320 fn on_start(&mut self) -> HandlerResult {
1321 if self.eager {
1322 self.ctx
1323 .init_buffers(Some(self.buffer_config.clone()), None);
1324 }
1325 Handled::OK
1326 }
1327 }
1328
1329 impl Actor for BigPongerAct {
1330 type Message = Never;
1331
1332 fn receive_local(&mut self, _ping: Self::Message) -> HandlerResult {
1333 unimplemented!()
1334 }
1335
1336 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
1337 let sender = msg.sender;
1338 match_deser! {
1339 (msg.data) {
1340 msg(ping): BigPingMsg [using BigPingPongSer] => {
1341 debug!(self.ctx.log(), "Got msg {:?} from {}", ping, sender);
1342 ping.validate();
1343 let pong = BigPongMsg::new(ping.i, ping.data.len());
1344 if self.eager {
1345 sender
1346 .tell_serialised(pong, self)
1347 .expect("BigPongMsg should serialise");
1348 } else {
1349 sender.tell(pong, self);
1350 }
1351 },
1352 err(e) => error!(self.ctx.log(), "Error deserialising BigPingMsg: {:?}", e),
1353 }
1354 }
1355 Handled::OK
1356 }
1357 }
1358
1359 #[derive(ComponentDefinition)]
1362 pub struct NetworkStatusCounter {
1363 ctx: ComponentContext<NetworkStatusCounter>,
1364 pub network_status_port: RequiredPort<NetworkStatusPort>,
1366 pub connection_established: u32,
1368 pub connection_lost: u32,
1370 pub connection_dropped: u32,
1372 pub connection_closed: u32,
1374 pub connected_systems: Vec<(SystemPath, SessionId)>,
1376 pub disconnected_systems: Vec<(SystemPath, SessionId)>,
1378 pub soft_connection_limit_exceeded: u32,
1380 pub hard_connection_limit_reached: u32,
1382 pub network_out_of_buffers: u32,
1384 pub critical_network_failure: u32,
1386 network_status_queue_sender: Option<Sender<NetworkStatus>>,
1387 started_promise: Option<KPromise<()>>,
1388 }
1389
1390 impl NetworkStatusCounter {
1391 pub fn new() -> NetworkStatusCounter {
1393 Self {
1394 ctx: ComponentContext::uninitialised(),
1395 network_status_port: RequiredPort::<NetworkStatusPort>::uninitialised(),
1396 connection_established: 0,
1397 connection_lost: 0,
1398 connection_dropped: 0,
1399 connection_closed: 0,
1400 connected_systems: Vec::new(),
1401 disconnected_systems: Vec::new(),
1402 soft_connection_limit_exceeded: 0,
1403 hard_connection_limit_reached: 0,
1404 network_out_of_buffers: 0,
1405 critical_network_failure: 0,
1406 network_status_queue_sender: None,
1407 started_promise: None,
1408 }
1409 }
1410
1411 pub fn set_status_sender(&mut self, sender: Sender<NetworkStatus>) {
1413 self.network_status_queue_sender = Some(sender);
1414 }
1415
1416 pub fn send_status_request(&mut self, request: NetworkStatusRequest) {
1418 debug!(self.ctx.log(), "Sending Status Request. {:?}", request);
1419 self.network_status_port.trigger(request);
1420 }
1421
1422 pub fn started_future(&mut self) -> KFuture<()> {
1424 let (promise, future) = promise();
1425 self.started_promise = Some(promise);
1426 future
1427 }
1428
1429 pub fn corrupt_network(&mut self) -> Result<(), SerError> {
1435 if let Some((receiving_system, _)) = self
1436 .connected_systems
1437 .iter()
1438 .find(|connected_system| !self.disconnected_systems.contains(connected_system))
1439 {
1440 warn!(self.ctx.log(), "Corrupting the local Network layer");
1441 let receiver = ActorPath::from((receiving_system.clone(), Uuid::new_v4()));
1442 let mut corrupted_chunk = self.ctx.preserialise(&PongMsg { i: 1 }).unwrap();
1443 corrupted_chunk.advance(50);
1444
1445 let corrupted_message =
1446 DispatchData::Serialised(SerialisedFrame::ChunkRef(corrupted_chunk));
1447
1448 self.ctx
1449 .system()
1450 .dispatcher_ref()
1451 .tell(DispatchEnvelope::Msg {
1452 src: self.ctx.actor_path(),
1453 dst: receiver,
1454 msg: corrupted_message,
1455 });
1456
1457 Ok(())
1458 } else {
1459 Err(SerError::unknown("No Valid Receivers"))
1460 }
1461 }
1462 }
1463
1464 impl Default for NetworkStatusCounter {
1465 fn default() -> Self {
1466 Self::new()
1467 }
1468 }
1469
1470 impl ComponentLifecycle for NetworkStatusCounter {
1471 fn on_start(&mut self) -> HandlerResult {
1472 if let Some(promise) = self.started_promise.take() {
1473 promise.complete().expect("Failed to fulfil promise");
1474 }
1475 Handled::OK
1476 }
1477
1478 fn on_stop(&mut self) -> HandlerResult {
1479 Handled::OK
1480 }
1481
1482 fn on_kill(&mut self) -> HandlerResult {
1483 Handled::OK
1484 }
1485 }
1486
1487 impl Actor for NetworkStatusCounter {
1488 type Message = ();
1489
1490 fn receive_local(&mut self, _msg: Self::Message) -> HandlerResult {
1491 unimplemented!()
1492 }
1493
1494 fn receive_network(&mut self, _msg: NetMessage) -> HandlerResult {
1495 unimplemented!("Ignoring network messages.")
1496 }
1497 }
1498
1499 impl Require<NetworkStatusPort> for NetworkStatusCounter {
1500 fn handle(&mut self, event: <NetworkStatusPort as Port>::Indication) -> HandlerResult {
1501 debug!(
1502 self.ctx.log(),
1503 "Got NetworkStatusPort indication. {:?}", event
1504 );
1505 if let Some(sender) = &self.network_status_queue_sender {
1506 sender
1507 .send(event.clone())
1508 .expect("StatusCounter to send NetworkStatus");
1509 }
1510 match event {
1511 NetworkStatus::ConnectionEstablished(system_path, session) => {
1512 self.connection_established += 1;
1513 self.connected_systems.push((system_path, session));
1514 }
1515 NetworkStatus::ConnectionLost(system_path, session) => {
1516 self.connection_lost += 1;
1517 self.disconnected_systems.push((system_path, session));
1518 }
1519 NetworkStatus::ConnectionDropped(_) => {
1520 self.connection_dropped += 1;
1521 }
1522 NetworkStatus::ConnectionClosed(system_path, session) => {
1523 self.connection_closed += 1;
1524 self.disconnected_systems.push((system_path, session));
1525 }
1526 NetworkStatus::SoftConnectionLimitExceeded => {
1527 self.soft_connection_limit_exceeded += 1
1528 }
1529 NetworkStatus::HardConnectionLimitReached => {
1530 self.hard_connection_limit_reached += 1
1531 }
1532 NetworkStatus::CriticalNetworkFailure => {
1533 self.critical_network_failure += 1;
1534 }
1535 NetworkStatus::BlockedIpNet(_) => (),
1536 NetworkStatus::AllowedIpNet(_) => (),
1537 NetworkStatus::AllowedIp(_) => (),
1538 NetworkStatus::BlockedSystem(_) => (),
1539 NetworkStatus::BlockedIp(_) => (),
1540 NetworkStatus::AllowedSystem(_) => (),
1541 }
1542 Handled::OK
1543 }
1544 }
1545
1546 #[derive(ComponentDefinition)]
1549 pub struct PingStream {
1550 ctx: ComponentContext<PingStream>,
1551 target: ActorPath,
1552 period: Duration,
1553 timer: Option<ScheduledTimer>,
1554 pub ping_count: u64,
1556 pub pong_count: u64,
1558 }
1559
1560 impl PingStream {
1561 pub fn new(target: ActorPath, period: Duration) -> Self {
1564 Self {
1565 ctx: ComponentContext::uninitialised(),
1566 target,
1567 period,
1568 timer: None,
1569 ping_count: 0,
1570 pong_count: 0,
1571 }
1572 }
1573
1574 pub fn start_pinging(&mut self) {
1576 let timer =
1577 self.schedule_periodic(Duration::from_millis(0), self.period, move |p, _| {
1578 p.ping();
1579 Handled::OK
1580 });
1581 self.timer = Some(timer);
1582 }
1583
1584 pub fn stop_pinging(&mut self) {
1586 let timer = self.timer.take().expect("No timer");
1587 self.cancel_timer(timer);
1588 }
1589
1590 fn ping(&mut self) {
1591 self.ping_count += 1;
1592 self.target
1593 .tell_serialised(PingMsg { i: self.ping_count }, self)
1594 .expect("serialise");
1595 }
1596 }
1597
1598 impl ComponentLifecycle for PingStream {
1599 fn on_start(&mut self) -> HandlerResult {
1600 debug!(self.ctx.log(), "Starting");
1601 self.start_pinging();
1602 Handled::OK
1603 }
1604 }
1605
1606 impl Actor for PingStream {
1607 type Message = Never;
1608
1609 fn receive_local(&mut self, _: Self::Message) -> HandlerResult {
1610 unimplemented!()
1611 }
1612
1613 fn receive_network(&mut self, msg: NetMessage) -> HandlerResult {
1614 match msg.try_deserialise::<PongMsg, PingPongSer>() {
1615 Ok(pong) => {
1616 debug!(self.ctx.log(), "Got msg {:?}", pong);
1617 self.pong_count += 1;
1618 }
1619 Err(e) => error!(self.ctx.log(), "Error deserialising PongMsg: {:?}", e),
1620 }
1621 Handled::OK
1622 }
1623 }
1624}