use super::*;
use crate::{
actors::{
Actor,
ActorPath,
Dispatcher,
DynActorRef,
NamedPath,
SystemPath,
Transport,
Transport::Tcp,
},
component::{Component, ComponentContext, ExecuteResult},
messaging::{
ActorRegistration,
DispatchData,
DispatchEnvelope,
EventEnvelope,
MsgEnvelope,
NetMessage,
PathResolvable,
PolicyRegistration,
RegistrationEnvelope,
RegistrationError,
RegistrationEvent,
RegistrationPromise,
},
net::{buffers::*, ConnectionState, NetworkBridgeErr, Protocol, SocketAddr},
prelude::SessionId,
timer::timer_manager::Timer,
};
use arc_swap::ArcSwap;
use futures::{
self,
task::{Context, Poll},
};
use ipnet::IpNet;
use lookup::{ActorLookup, ActorStore, InsertResult, LookupResult};
use queue_manager::QueueManager;
use rustc_hash::FxHashMap;
use std::{collections::VecDeque, net::IpAddr, pin::Pin, sync::Arc, time::Duration};
pub mod lookup;
pub mod queue_manager;
mod defaults {
pub(crate) const RETRY_CONNECTIONS_INTERVAL: u64 = 5000;
pub(crate) const BOOT_TIMEOUT: u64 = 5000;
pub(crate) const MAX_RETRY_ATTEMPTS: u8 = 10;
pub(crate) const SOFT_CONNECTION_LIMIT: u32 = 1000;
pub(crate) const HARD_CONNECTION_LIMIT: u32 = 1100;
}
type NetHashMap<K, V> = FxHashMap<K, V>;
#[derive(Clone, Debug)]
pub struct NetworkConfig {
addr: SocketAddr,
transport: Transport,
buffer_config: BufferConfig,
custom_allocator: Option<Arc<dyn ChunkAllocator>>,
tcp_nodelay: bool,
max_connection_retry_attempts: u8,
connection_retry_interval: u64,
boot_timeout: u64,
soft_connection_limit: u32,
hard_connection_limit: u32,
}
impl NetworkConfig {
pub fn new(addr: SocketAddr) -> Self {
NetworkConfig {
addr,
transport: Transport::Tcp,
buffer_config: BufferConfig::default(),
custom_allocator: None,
tcp_nodelay: true,
max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
boot_timeout: defaults::BOOT_TIMEOUT,
soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
}
}
pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
buffer_config.validate();
let mut cfg = NetworkConfig::new(addr);
cfg.set_buffer_config(buffer_config);
cfg
}
pub fn with_custom_allocator(
addr: SocketAddr,
buffer_config: BufferConfig,
custom_allocator: Arc<dyn ChunkAllocator>,
) -> Self {
buffer_config.validate();
NetworkConfig {
addr,
transport: Transport::Tcp,
buffer_config,
custom_allocator: Some(custom_allocator),
tcp_nodelay: true,
max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
boot_timeout: defaults::BOOT_TIMEOUT,
soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
}
}
pub fn with_socket(mut self, addr: SocketAddr) -> Self {
self.addr = addr;
self
}
pub fn build(self) -> impl Fn(KPromise<()>) -> NetworkDispatcher {
move |notify_ready| NetworkDispatcher::with_config(self.clone(), notify_ready)
}
pub fn get_buffer_config(&self) -> &BufferConfig {
&self.buffer_config
}
pub fn set_buffer_config(&mut self, buffer_config: BufferConfig) -> () {
self.buffer_config = buffer_config;
}
pub fn get_custom_allocator(&self) -> &Option<Arc<dyn ChunkAllocator>> {
&self.custom_allocator
}
pub fn get_tcp_nodelay(&self) -> bool {
self.tcp_nodelay
}
pub fn set_tcp_nodelay(&mut self, nodelay: bool) {
self.tcp_nodelay = nodelay;
}
pub fn set_max_connection_retry_attempts(&mut self, count: u8) {
self.max_connection_retry_attempts = count;
}
pub fn get_max_connection_retry_attempts(&self) -> u8 {
self.max_connection_retry_attempts
}
pub fn set_connection_retry_interval(&mut self, milliseconds: u64) {
self.connection_retry_interval = milliseconds;
}
pub fn get_connection_retry_interval(&self) -> u64 {
self.connection_retry_interval
}
pub fn set_boot_timeout(&mut self, milliseconds: u64) {
self.boot_timeout = milliseconds;
}
pub fn get_boot_timeout(&self) -> u64 {
self.boot_timeout
}
pub fn set_soft_connection_limit(&mut self, limit: u32) {
self.soft_connection_limit = limit;
}
pub fn get_soft_connection_limit(&self) -> u32 {
self.soft_connection_limit
}
pub fn set_hard_connection_limit(&mut self, limit: u32) {
self.hard_connection_limit = limit;
}
pub fn get_hard_connection_limit(&self) -> u32 {
self.hard_connection_limit
}
}
impl Default for NetworkConfig {
fn default() -> Self {
NetworkConfig {
addr: "127.0.0.1:0".parse().unwrap(),
transport: Transport::Tcp,
buffer_config: BufferConfig::default(),
custom_allocator: None,
tcp_nodelay: true,
max_connection_retry_attempts: defaults::MAX_RETRY_ATTEMPTS,
connection_retry_interval: defaults::RETRY_CONNECTIONS_INTERVAL,
boot_timeout: defaults::BOOT_TIMEOUT,
soft_connection_limit: defaults::SOFT_CONNECTION_LIMIT,
hard_connection_limit: defaults::HARD_CONNECTION_LIMIT,
}
}
}
pub struct NetworkStatusPort;
impl Port for NetworkStatusPort {
type Indication = NetworkStatus;
type Request = NetworkStatusRequest;
}
#[derive(Clone, Debug)]
pub enum NetworkStatus {
ConnectionEstablished(SystemPath, SessionId),
ConnectionLost(SystemPath, SessionId),
ConnectionDropped(SystemPath),
ConnectionClosed(SystemPath, SessionId),
BlockedSystem(SystemPath),
BlockedIp(IpAddr),
BlockedIpNet(IpNet),
AllowedIpNet(IpNet),
AllowedSystem(SystemPath),
AllowedIp(IpAddr),
SoftConnectionLimitExceeded,
HardConnectionLimitReached,
CriticalNetworkFailure,
}
#[derive(Clone, Debug)]
pub enum NetworkStatusRequest {
DisconnectSystem(SystemPath),
ConnectSystem(SystemPath),
BlockSystem(SystemPath),
BlockIp(IpAddr),
BlockIpNet(IpNet),
AllowSystem(SystemPath),
AllowIp(IpAddr),
AllowIpNet(IpNet),
}
#[derive(ComponentDefinition)]
pub struct NetworkDispatcher {
ctx: ComponentContext<NetworkDispatcher>,
connections: NetHashMap<SocketAddr, ConnectionState>,
cfg: NetworkConfig,
lookup: Arc<ArcSwap<ActorStore>>,
net_bridge: Option<net::Bridge>,
system_path: Option<SystemPath>,
queue_manager: QueueManager,
reaper: lookup::gc::ActorRefReaper,
notify_ready: Option<KPromise<()>>,
retry_map: FxHashMap<SocketAddr, u8>,
garbage_buffers: VecDeque<BufferChunk>,
network_status_port: ProvidedPort<NetworkStatusPort>,
}
impl NetworkDispatcher {
pub fn new(notify_ready: KPromise<()>) -> Self {
let config = NetworkConfig::default();
NetworkDispatcher::with_config(config, notify_ready)
}
pub fn with_config(cfg: NetworkConfig, notify_ready: KPromise<()>) -> Self {
let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new()));
let reaper = lookup::gc::ActorRefReaper::default();
NetworkDispatcher {
ctx: ComponentContext::uninitialised(),
connections: Default::default(),
cfg,
lookup,
net_bridge: None,
system_path: None,
queue_manager: QueueManager::new(),
reaper,
notify_ready: Some(notify_ready),
garbage_buffers: VecDeque::new(),
retry_map: Default::default(),
network_status_port: ProvidedPort::uninitialised(),
}
}
pub fn system_path_ref(&mut self) -> &SystemPath {
match self.system_path {
Some(ref path) => path,
None => {
let _ = self.system_path(); if let Some(ref path) = self.system_path {
path
} else {
unreachable!(
"Cached value should have been filled by calling self.system_path()!"
);
}
}
}
}
fn start(&mut self) -> () {
debug!(self.ctx.log(), "Starting self and network bridge");
self.reaper = lookup::gc::ActorRefReaper::from_config(self.ctx.config());
self.start_bridge(self.cfg.addr);
let deadletter: DynActorRef = self.ctx.system().deadletter_ref().dyn_ref();
self.lookup.rcu(|current| {
let mut next = ActorStore::clone(current);
next.insert(PathResolvable::System, deadletter.clone())
.expect("Deadletter shouldn't error");
next
});
self.schedule_retries();
}
fn start_bridge(&mut self, address: SocketAddr) -> () {
let dispatcher = self
.actor_ref()
.hold()
.expect("Self can hardly be deallocated!");
let bridge_logger = self.ctx.log().new(o!("owner" => "Bridge"));
let network_thread_logger = self.ctx.log().new(o!("owner" => "NetworkThread"));
let (mut bridge, _addr) = net::Bridge::new(
self.lookup.clone(),
network_thread_logger,
bridge_logger,
address,
dispatcher.clone(),
&self.cfg,
);
bridge.set_dispatcher(dispatcher);
self.net_bridge = Some(bridge);
}
fn handle_network_failure(&mut self) -> () {
self.network_status_port
.trigger(NetworkStatus::CriticalNetworkFailure);
let faulty_bridge = self.net_bridge.take();
let connections: Vec<(SocketAddr, ConnectionState)> = self.connections.drain().collect();
for (address, state) in connections {
if let ConnectionState::Connected(id) = state {
self.connection_lost(SystemPath::with_socket(Transport::Tcp, address), id);
} else {
self.connections.insert(address, state);
}
}
let bound_address = faulty_bridge
.map(|b| b.local_addr().unwrap_or(self.cfg.addr))
.unwrap_or(self.cfg.addr);
self.start_bridge(bound_address);
}
fn stop(&mut self) -> () {
if let Some(bridge) = self.net_bridge.take() {
if let Err(e) = bridge.stop() {
error!(
self.ctx().log(),
"NetworkBridge did not shut down as expected! Error was:\n {:?}\n", e
);
}
}
}
fn kill(&mut self) -> () {
if let Some(bridge) = self.net_bridge.take() {
if let Err(e) = bridge.kill() {
error!(
self.ctx().log(),
"NetworkBridge did not shut down as expected! Error was:\n {:?}\n", e
);
}
}
}
fn schedule_reaper(&mut self) {
if !self.reaper.is_scheduled() {
self.reaper.schedule();
} else {
let num_reaped = self.reaper.run(&self.lookup);
if num_reaped == 0 {
self.reaper.strategy_mut().incr();
} else {
self.reaper.strategy_mut().decr();
}
}
let next_wakeup = self.reaper.strategy().curr();
debug!(
self.ctx().log(),
"Scheduling reaping at {:?}ms", next_wakeup
);
let mut retry_queue = VecDeque::new();
for mut trash in self.garbage_buffers.drain(..) {
if !trash.free() {
retry_queue.push_back(trash);
}
}
self.garbage_buffers.append(&mut retry_queue);
self.schedule_once(Duration::from_millis(next_wakeup), move |target, _id| {
target.schedule_reaper();
Handled::Ok
});
}
fn schedule_retries(&mut self) {
let drain = self.retry_map.clone();
self.retry_map.clear();
for (addr, retry) in drain {
if retry < self.cfg.max_connection_retry_attempts {
self.retry_map.insert(addr, retry + 1);
if let Some(bridge) = &self.net_bridge {
debug!(
self.ctx().log(),
"Dispatcher retrying connection to host {}, attempt {}/{}",
addr,
retry,
self.cfg.max_connection_retry_attempts
);
bridge.connect(Transport::Tcp, addr).unwrap();
}
} else {
info!(
self.ctx().log(),
"Dispatcher giving up on remote host {}, dropping queues", addr
);
self.queue_manager.drop_queue(&addr);
self.connections.remove(&addr);
self.network_status_port
.trigger(NetworkStatus::ConnectionDropped(SystemPath::with_socket(
Transport::Tcp,
addr,
)));
}
}
self.schedule_once(
Duration::from_millis(self.cfg.connection_retry_interval),
move |target, _id| {
target.schedule_retries();
Handled::Ok
},
);
}
fn on_event(&mut self, ev: EventEnvelope) {
match ev {
EventEnvelope::Network(ev) => match ev {
NetworkStatus::ConnectionEstablished(system_path, session) => {
self.connection_established(system_path, session)
}
NetworkStatus::ConnectionLost(system_path, session) => {
self.connection_lost(system_path, session)
}
NetworkStatus::ConnectionClosed(system_path, session) => {
self.connection_closed(system_path, session)
}
NetworkStatus::ConnectionDropped(system_path) => {
let _ = self.retry_map.remove(&system_path.socket_address());
self.network_status_port
.trigger(NetworkStatus::ConnectionDropped(system_path));
}
NetworkStatus::BlockedSystem(system_path) => {
self.connections
.insert(system_path.socket_address(), ConnectionState::Blocked);
self.network_status_port
.trigger(NetworkStatus::BlockedSystem(system_path));
}
NetworkStatus::BlockedIp(ip_addr) => {
self.network_status_port
.trigger(NetworkStatus::BlockedIp(ip_addr));
}
NetworkStatus::BlockedIpNet(ip_net) => {
self.network_status_port
.trigger(NetworkStatus::BlockedIpNet(ip_net));
}
NetworkStatus::AllowedSystem(system_path) => {
self.connections.remove(&system_path.socket_address());
self.network_status_port
.trigger(NetworkStatus::AllowedSystem(system_path));
}
NetworkStatus::AllowedIp(ip_addr) => {
self.network_status_port
.trigger(NetworkStatus::AllowedIp(ip_addr));
}
NetworkStatus::AllowedIpNet(ip_net) => {
self.network_status_port
.trigger(NetworkStatus::AllowedIpNet(ip_net));
}
NetworkStatus::SoftConnectionLimitExceeded => self
.network_status_port
.trigger(NetworkStatus::SoftConnectionLimitExceeded),
NetworkStatus::HardConnectionLimitReached => self
.network_status_port
.trigger(NetworkStatus::HardConnectionLimitReached),
NetworkStatus::CriticalNetworkFailure => self.handle_network_failure(),
},
EventEnvelope::RejectedData((addr, data)) => {
self.queue_manager.enqueue_priority_data(*data, addr);
self.retry_map.entry(addr).or_insert(0);
}
}
}
fn connection_established(&mut self, system_path: SystemPath, id: SessionId) {
info!(
self.ctx().log(),
"registering newly connected conn at {:?}", system_path
);
let addr = &system_path.socket_address();
self.network_status_port
.trigger(NetworkStatus::ConnectionEstablished(system_path, id));
let _ = self.retry_map.remove(addr);
if self.queue_manager.has_data(addr) {
while let Some(frame) = self.queue_manager.pop_data(addr) {
if let Some(bridge) = &self.net_bridge {
if let Err(e) = bridge.route(*addr, frame, net::Protocol::Tcp) {
error!(self.ctx.log(), "Bridge error while routing {:?}", e);
}
}
}
}
self.connections
.insert(*addr, ConnectionState::Connected(id));
}
fn connection_closed(&mut self, system_path: SystemPath, id: SessionId) {
let addr = &system_path.socket_address();
self.network_status_port
.trigger(NetworkStatus::ConnectionClosed(system_path, id));
if let Some(bridge) = &self.net_bridge {
if let Err(e) = bridge.ack_closed(*addr) {
error!(
self.ctx.log(),
"Bridge error while acking closed connection {:?}", e
);
}
}
self.connections.insert(*addr, ConnectionState::Closed(id));
if self.queue_manager.has_data(addr) {
self.retry_map.insert(*addr, 0);
}
}
fn connection_lost(&mut self, system_path: SystemPath, id: SessionId) {
let addr = &system_path.socket_address();
if !self.retry_map.contains_key(addr) {
warn!(self.ctx().log(), "connection lost to {:?}", addr);
self.retry_map.insert(*addr, 0); }
self.network_status_port
.trigger(NetworkStatus::ConnectionLost(system_path, id));
if let Some(bridge) = &self.net_bridge {
if let Err(e) = bridge.ack_closed(*addr) {
error!(
self.ctx.log(),
"Bridge error while acking lost connection {:?}", e
);
}
}
self.connections.insert(*addr, ConnectionState::Lost(id));
}
fn route_local(&mut self, dst: ActorPath, msg: DispatchData) -> () {
let lookup = self.lookup.load();
let lookup_result = lookup.get_by_actor_path(&dst);
match msg.into_local() {
Ok(netmsg) => match lookup_result {
LookupResult::Ref(actor) => {
actor.enqueue(netmsg);
}
LookupResult::Group(group) => {
group.route(netmsg, self.log());
}
LookupResult::None => {
error!(
self.ctx.log(),
"No local actor found at {:?}. Forwarding to DeadletterBox",
netmsg.receiver,
);
self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
}
LookupResult::Err(e) => {
error!(
self.ctx.log(),
"An error occurred during local actor lookup at {:?}. Forwarding to DeadletterBox. The error was: {}",
netmsg.receiver,
e
);
self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
}
},
Err(e) => {
error!(self.log(), "Could not serialise msg: {:?}. Dropping...", e);
}
}
}
fn route_remote_udp(
&mut self,
addr: SocketAddr,
data: DispatchData,
) -> Result<(), NetworkBridgeErr> {
if let Some(bridge) = &self.net_bridge {
bridge.route(addr, data, net::Protocol::Udp)?;
} else {
warn!(
self.ctx.log(),
"Dropping UDP message to {}, as bridge is not connected.", addr
);
}
Ok(())
}
fn route_remote_tcp(
&mut self,
addr: SocketAddr,
data: DispatchData,
) -> Result<(), NetworkBridgeErr> {
let state: &mut ConnectionState =
self.connections.entry(addr).or_insert(ConnectionState::New);
let next: Option<ConnectionState> = match *state {
ConnectionState::New => {
debug!(
self.ctx.log(),
"No connection found; establishing and queuing frame"
);
self.queue_manager.enqueue_data(data, addr);
if let Some(ref mut bridge) = self.net_bridge {
debug!(self.ctx.log(), "Establishing new connection to {:?}", addr);
self.retry_map.insert(addr, 0); bridge.connect(Transport::Tcp, addr).unwrap();
Some(ConnectionState::Initializing)
} else {
error!(self.ctx.log(), "No network bridge found; dropping message");
None
}
}
ConnectionState::Connected(_) => {
if self.queue_manager.has_data(&addr) {
self.queue_manager.enqueue_data(data, addr);
if let Some(bridge) = &self.net_bridge {
while let Some(queued_data) = self.queue_manager.pop_data(&addr) {
bridge.route(addr, queued_data, net::Protocol::Tcp)?;
}
}
None
} else {
if let Some(bridge) = &self.net_bridge {
bridge.route(addr, data, net::Protocol::Tcp)?;
}
None
}
}
ConnectionState::Initializing => {
self.queue_manager.enqueue_data(data, addr);
None
}
ConnectionState::Closed(_) => {
self.queue_manager.enqueue_data(data, addr);
if let Some(bridge) = &self.net_bridge {
self.retry_map.entry(addr).or_insert(0);
bridge.connect(Tcp, addr)?;
}
Some(ConnectionState::Initializing)
}
ConnectionState::Lost(_) => {
self.queue_manager.enqueue_data(data, addr);
None
}
ConnectionState::Blocked => {
warn!(
self.ctx.log(),
"Tried sending a message to a blocked connection: {:?}. Dropping message.",
addr
);
None
}
};
if let Some(next) = next {
*state = next;
}
Ok(())
}
fn resolve_path(&mut self, resolvable: &PathResolvable) -> Result<ActorPath, PathParseError> {
match resolvable {
PathResolvable::Path(actor_path) => Ok(actor_path.clone()),
PathResolvable::Alias(alias) => self
.system_path()
.into_named_with_string(alias)
.map(|p| p.into()),
PathResolvable::Segments(segments) => self
.system_path()
.into_named_with_vec(segments.to_vec())
.map(|p| p.into()),
PathResolvable::ActorId(id) => Ok(self.system_path().into_unique(*id).into()),
PathResolvable::System => Ok(self.deadletter_path()),
}
}
fn route(&mut self, dst: ActorPath, msg: DispatchData) -> Result<(), NetworkBridgeErr> {
if self.system_path_ref() == dst.system() {
self.route_local(dst, msg);
Ok(())
} else {
let proto = dst.system().protocol();
match proto {
Transport::Local => {
self.route_local(dst, msg);
Ok(())
}
Transport::Tcp => {
let addr = SocketAddr::new(*dst.address(), dst.port());
self.route_remote_tcp(addr, msg)
}
Transport::Udp => {
let addr = SocketAddr::new(*dst.address(), dst.port());
self.route_remote_udp(addr, msg)
}
}
}
}
fn deadletter_path(&mut self) -> ActorPath {
ActorPath::Named(NamedPath::with_system(self.system_path(), Vec::new()))
}
fn register_actor(
&mut self,
registration: ActorRegistration,
update: bool,
promise: RegistrationPromise,
) {
let ActorRegistration { actor, path } = registration;
let res = self
.resolve_path(&path)
.map_err(RegistrationError::InvalidPath)
.and_then(|ap| {
let lease = self.lookup.load();
if lease.contains(&path) && !update {
warn!(
self.ctx.log(),
"Detected duplicate path during registration. The path will not be re-registered"
);
drop(lease);
Err(RegistrationError::DuplicateEntry)
} else {
drop(lease);
let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
self.lookup.rcu(|current| {
let mut next = ActorStore::clone(current);
result = next.insert(path.clone(), actor.clone());
next
});
if let Ok(ref res) = result {
if !res.is_empty() {
info!(self.ctx.log(), "Replaced entry for path={:?}", path);
}
}
result.map(|_| ap)
.map_err(RegistrationError::InvalidPath)
}
});
if res.is_ok() && !self.reaper.is_scheduled() {
self.schedule_reaper();
}
debug!(self.log(), "Completed actor registration with {:?}", res);
match promise {
RegistrationPromise::Fulfil(promise) => {
promise.fulfil(res).unwrap_or_else(|e| {
error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
});
}
RegistrationPromise::None => (), }
}
fn register_policy(
&mut self,
registration: PolicyRegistration,
update: bool,
promise: RegistrationPromise,
) {
let PolicyRegistration { policy, path } = registration;
let lease = self.lookup.load();
let path_res = PathResolvable::Segments(path);
let res = self
.resolve_path(&path_res)
.map_err(RegistrationError::InvalidPath)
.and_then(|ap| {
if lease.contains(&path_res) && !update {
warn!(
self.ctx.log(),
"Detected duplicate path during registration. The path will not be re-registered",
);
drop(lease);
Err(RegistrationError::DuplicateEntry)
} else {
drop(lease);
let_irrefutable!(path, PathResolvable::Segments(path) = path_res);
let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
self.lookup.rcu(|current| {
let mut next = ActorStore::clone(current);
result = next.set_routing_policy(&path, policy.clone());
next
});
if let Ok(ref res) = result {
if !res.is_empty() {
info!(self.ctx.log(), "Replaced entry for path={:?}", path);
}
}
result.map(|_| ap).map_err(RegistrationError::InvalidPath)
}
});
debug!(self.log(), "Completed policy registration with {:?}", res);
match promise {
RegistrationPromise::Fulfil(promise) => {
promise.fulfil(res).unwrap_or_else(|e| {
error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
});
}
RegistrationPromise::None => (), }
}
fn close_channel(&mut self, addr: SocketAddr) -> () {
if let Some(state) = self.connections.get_mut(&addr) {
match state {
ConnectionState::Connected(session) => {
trace!(
self.ctx.log(),
"Closing channel to connected system {}, session {:?}",
addr,
session
);
if let Some(bridge) = &self.net_bridge {
while self.queue_manager.has_data(&addr) {
if let Some(data) = self.queue_manager.pop_data(&addr) {
if let Err(e) = bridge.route(addr, data, Protocol::Tcp) {
error!(self.ctx.log(), "Bridge error while routing {:?}", e);
}
}
}
if let Err(e) = bridge.close_channel(addr) {
error!(self.ctx.log(), "Bridge error closing channel {:?}", e);
}
}
}
_ => {
warn!(
self.ctx.log(),
"Trying to close channel to a system which is not connected {}", addr
);
}
}
} else {
warn!(self.ctx.log(), "Closing channel to unknown system {}", addr);
}
}
}
impl Actor for NetworkDispatcher {
type Message = DispatchEnvelope;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
DispatchEnvelope::Msg { src: _, dst, msg } => {
if let Err(e) = self.route(dst, msg) {
error!(self.ctx.log(), "Failed to route message: {:?}", e);
};
}
DispatchEnvelope::ForwardedMsg { msg } => {
if let Err(e) = self.route(msg.receiver.clone(), DispatchData::NetMessage(msg)) {
error!(self.ctx.log(), "Failed to route message: {:?}", e);
};
}
DispatchEnvelope::Registration(reg) => {
trace!(self.log(), "Got registration request: {:?}", reg);
let RegistrationEnvelope {
event,
update,
promise,
} = reg;
match event {
RegistrationEvent::Actor(rea) => self.register_actor(rea, update, promise),
RegistrationEvent::Policy(rep) => self.register_policy(rep, update, promise),
}
}
DispatchEnvelope::Event(ev) => self.on_event(ev),
DispatchEnvelope::LockedChunk(trash) => self.garbage_buffers.push_back(trash),
}
Handled::Ok
}
fn receive_network(&mut self, msg: NetMessage) -> Handled {
warn!(self.ctx.log(), "Received network message: {:?}", msg,);
Handled::Ok
}
}
impl Dispatcher for NetworkDispatcher {
fn system_path(&mut self) -> SystemPath {
match self.system_path {
Some(ref path) => path.clone(),
None => {
let bound_addr = match self.net_bridge {
Some(ref net_bridge) => net_bridge.local_addr().expect("If net bridge is ready, port should be as well!"),
None => panic!("You must wait until the socket is bound before attempting to create a system path!"),
};
let sp = SystemPath::new(self.cfg.transport, bound_addr.ip(), bound_addr.port());
self.system_path = Some(sp.clone());
sp
}
}
}
fn network_status_port(&mut self) -> &mut ProvidedPort<NetworkStatusPort> {
&mut self.network_status_port
}
}
impl ComponentLifecycle for NetworkDispatcher {
fn on_start(&mut self) -> Handled {
info!(self.ctx.log(), "Starting network...");
self.start();
info!(self.ctx.log(), "Started network just fine.");
if let Some(promise) = self.notify_ready.take() {
promise
.complete()
.unwrap_or_else(|e| error!(self.ctx.log(), "Could not start network! {:?}", e))
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
info!(self.ctx.log(), "Stopping network...");
self.stop();
info!(self.ctx.log(), "Stopped network.");
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
info!(self.ctx.log(), "Killing network...");
self.kill();
info!(self.ctx.log(), "Killed network.");
Handled::Ok
}
}
impl Provide<NetworkStatusPort> for NetworkDispatcher {
fn handle(&mut self, event: <NetworkStatusPort as Port>::Request) -> Handled {
debug!(
self.ctx.log(),
"Received NetworkStatusPort Request {:?}", event
);
match event {
NetworkStatusRequest::DisconnectSystem(system_path) => {
self.close_channel(system_path.socket_address());
}
NetworkStatusRequest::ConnectSystem(system_path) => {
if let Some(bridge) = &self.net_bridge {
bridge
.connect(system_path.protocol(), system_path.socket_address())
.unwrap();
}
}
NetworkStatusRequest::BlockIp(ip_addr) => {
debug!(self.ctx.log(), "Got BlockIp: {:?}", ip_addr);
if let Some(bridge) = &self.net_bridge {
bridge.block_ip(ip_addr).unwrap();
}
}
NetworkStatusRequest::BlockSystem(system_path) => {
debug!(self.ctx.log(), "Got BlockSystem: {:?}", system_path);
if let Some(bridge) = &self.net_bridge {
bridge.block_socket(system_path.socket_address()).unwrap();
}
}
NetworkStatusRequest::BlockIpNet(ip_net) => {
debug!(self.ctx.log(), "Got BlockIpNet: {:?}", ip_net);
if let Some(bridge) = &self.net_bridge {
bridge.block_ip_net(ip_net).unwrap();
}
}
NetworkStatusRequest::AllowIp(ip_addr) => {
debug!(self.ctx.log(), "Got AllowIp: {:?}", ip_addr);
if let Some(bridge) = &self.net_bridge {
bridge.allow_ip(ip_addr).unwrap();
}
}
NetworkStatusRequest::AllowSystem(system_path) => {
debug!(self.ctx.log(), "Got AllowSystem: {:?}", system_path);
if let Some(bridge) = &self.net_bridge {
bridge.allow_socket(system_path.socket_address()).unwrap();
}
}
NetworkStatusRequest::AllowIpNet(ip_net) => {
debug!(self.ctx.log(), "Got AllowIpNet: {:?}", ip_net);
if let Some(bridge) = &self.net_bridge {
bridge.allow_ip_net(ip_net).unwrap();
}
}
}
Handled::Ok
}
}
impl futures::sink::Sink<DispatchEnvelope> for ActorRefStrong<DispatchEnvelope> {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: DispatchEnvelope) -> Result<(), Self::Error> {
self.tell(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
impl futures::sink::Sink<NetMessage> for DynActorRef {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: NetMessage) -> Result<(), Self::Error> {
DynActorRef::enqueue(&self.as_ref(), item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::{super::*, *};
use crate::prelude_test::net_test_helpers::{PingerAct, PongerAct};
use std::{thread, time::Duration};
#[test]
fn network_cleanup() {
let mut cfg = KompactConfig::default();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
net_config.build()
});
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
let port = system.system_path().port();
println!("Got port: {}", port);
println!("Shutting down first system...");
system
.shutdown()
.expect("KompactSystem failed to shut down!");
println!("System shut down.");
let mut cfg2 = KompactConfig::default();
println!("Configuring network");
cfg2.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
net_config.build()
});
println!("Starting 2nd KompactSystem");
let system2 = cfg2.build().expect("KompactSystem");
thread::sleep(Duration::from_millis(100));
println!("2nd KompactSystem started just fine.");
let named_path2 = ActorPath::Named(NamedPath::with_system(
system2.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
assert_eq!(named_path, named_path2);
system2
.shutdown()
.expect("2nd KompactSystem failed to shut down!");
}
#[test]
fn network_cleanup_with_timeout() {
let mut cfg = KompactConfig::default();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
net_config.build()
});
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
let port = system.system_path().port();
println!("Got port: {}", port);
thread::Builder::new()
.name("System1 Killer".to_string())
.spawn(move || {
thread::sleep(Duration::from_millis(100));
println!("Shutting down first system...");
system
.shutdown()
.expect("KompactSystem failed to shut down!");
println!("System shut down.");
})
.ok();
let mut cfg2 = KompactConfig::default();
println!("Configuring network");
cfg2.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
net_config.build()
});
println!("Starting 2nd KompactSystem");
let system2 = cfg2.build().expect("KompactSystem");
thread::sleep(Duration::from_millis(100));
println!("2nd KompactSystem started just fine.");
let named_path2 = ActorPath::Named(NamedPath::with_system(
system2.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
assert_eq!(named_path, named_path2);
system2
.shutdown()
.expect("2nd KompactSystem failed to shut down!");
}
#[test]
fn test_system_path_timing() {
let mut cfg = KompactConfig::default();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
}
#[test]
fn cleanup_bufferchunks_from_dead_actors() {
let system1 = || {
let mut cfg = KompactConfig::default();
cfg.system_components(
DeadletterBox::new,
NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")).build(),
);
cfg.build().expect("KompactSystem")
};
let system2 = |port| {
let mut cfg = KompactConfig::default();
cfg.system_components(
DeadletterBox::new,
NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port)).build(),
);
cfg.build().expect("KompactSystem")
};
let system2a = system2(0);
let port = system2a.system_path().port();
let (ponger_named, ponf) = system2a.create_and_register(PongerAct::new_lazy);
let poaf = system2a.register_by_alias(&ponger_named, "custom_name");
ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
let named_path = ActorPath::Named(NamedPath::with_system(
system2a.system_path(),
vec!["custom_name".into()],
));
let named_path_clone = named_path;
let system1: KompactSystem = system1();
let (pinger_named, pinf) =
system1.create_and_register(move || PingerAct::new_eager(named_path_clone));
pinf.wait_expect(Duration::from_millis(1000), "Pinger failed to register!");
system2a.shutdown().ok();
system1.start(&pinger_named);
thread::sleep(Duration::from_millis(100));
system1.kill(pinger_named);
thread::sleep(Duration::from_millis(5000));
let mut garbage_len = 0;
let sc: &dyn SystemComponents = system1.get_system_components();
if let Some(cc) = sc.downcast::<CustomComponents<DeadletterBox, NetworkDispatcher>>() {
garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len());
}
assert_ne!(0, garbage_len);
println!("Setting up system2b");
let system2b = system2(port);
let (ponger_named, ponf) = system2b.create_and_register(PongerAct::new_lazy);
let poaf = system2b.register_by_alias(&ponger_named, "custom_name");
ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
println!("Starting actor on system2b");
system2b.start(&ponger_named);
thread::sleep(Duration::from_millis(10000));
if let Some(cc) = sc.downcast::<CustomComponents<DeadletterBox, NetworkDispatcher>>() {
garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len());
}
assert_eq!(0, garbage_len);
system1
.shutdown()
.expect("Kompact didn't shut down properly");
system2b
.shutdown()
.expect("Kompact didn't shut down properly");
}
}