use super::*;
use crate::{
actors::{Actor, ActorPath, Dispatcher, DynActorRef, SystemPath, Transport},
component::{Component, ComponentContext, ExecuteResult},
};
use std::{net::SocketAddr, pin::Pin, sync::Arc};
use crate::{
actors::NamedPath,
messaging::{
ActorRegistration,
DispatchData,
DispatchEnvelope,
EventEnvelope,
MsgEnvelope,
NetMessage,
PathResolvable,
PolicyRegistration,
RegistrationEnvelope,
RegistrationError,
RegistrationEvent,
RegistrationPromise,
SerialisedFrame,
},
net::{buffers::*, events::NetworkEvent, ConnectionState, NetworkBridgeErr},
timer::timer_manager::Timer,
};
use arc_swap::ArcSwap;
use futures::{
self,
task::{Context, Poll},
};
use lookup::{ActorLookup, ActorStore, InsertResult, LookupResult};
use queue_manager::QueueManager;
use rustc_hash::FxHashMap;
use std::{collections::VecDeque, io::ErrorKind, time::Duration};
pub mod lookup;
pub mod queue_manager;
const RETRY_CONNECTIONS_INTERVAL: u64 = 5000;
const MAX_RETRY_ATTEMPTS: u8 = 10;
type NetHashMap<K, V> = FxHashMap<K, V>;
#[derive(Clone, Debug)]
pub struct NetworkConfig {
addr: SocketAddr,
transport: Transport,
buffer_config: BufferConfig,
custom_allocator: Option<Arc<dyn ChunkAllocator>>,
tcp_nodelay: bool,
max_connection_retry_attempts: u8,
connection_retry_interval: u64,
}
impl NetworkConfig {
pub fn new(addr: SocketAddr) -> Self {
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config: BufferConfig::default(),
custom_allocator: None,
tcp_nodelay: false,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
}
}
pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
buffer_config.validate();
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config,
custom_allocator: None,
tcp_nodelay: false,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
}
}
pub fn with_custom_allocator(
addr: SocketAddr,
buffer_config: BufferConfig,
custom_allocator: Arc<dyn ChunkAllocator>,
) -> Self {
buffer_config.validate();
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config,
custom_allocator: Some(custom_allocator),
tcp_nodelay: false,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
}
}
pub fn with_socket(mut self, addr: SocketAddr) -> Self {
self.addr = addr;
self
}
pub fn build(self) -> impl Fn(KPromise<()>) -> NetworkDispatcher {
move |notify_ready| NetworkDispatcher::with_config(self.clone(), notify_ready)
}
pub fn get_buffer_config(&self) -> &BufferConfig {
&self.buffer_config
}
pub fn set_buffer_config(&mut self, buffer_config: BufferConfig) -> () {
self.buffer_config = buffer_config;
}
pub fn get_custom_allocator(&self) -> &Option<Arc<dyn ChunkAllocator>> {
&self.custom_allocator
}
pub fn get_tcp_nodelay(&self) -> bool {
self.tcp_nodelay
}
pub fn set_tcp_nodelay(&mut self, nodelay: bool) {
self.tcp_nodelay = nodelay;
}
pub fn set_max_connection_retry_attempts(&mut self, count: u8) {
self.max_connection_retry_attempts = count;
}
pub fn get_max_connection_retry_attempts(&self) -> u8 {
self.max_connection_retry_attempts
}
pub fn set_connection_retry_interval(&mut self, milliseconds: u64) {
self.connection_retry_interval = milliseconds;
}
pub fn get_connection_retry_interval(&self) -> u64 {
self.connection_retry_interval
}
}
impl Default for NetworkConfig {
fn default() -> Self {
NetworkConfig {
addr: "127.0.0.1:0".parse().unwrap(),
transport: Transport::TCP,
buffer_config: BufferConfig::default(),
custom_allocator: None,
tcp_nodelay: false,
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
}
}
}
#[derive(ComponentDefinition)]
pub struct NetworkDispatcher {
ctx: ComponentContext<NetworkDispatcher>,
connections: NetHashMap<SocketAddr, ConnectionState>,
cfg: NetworkConfig,
lookup: Arc<ArcSwap<ActorStore>>,
net_bridge: Option<net::Bridge>,
system_path: Option<SystemPath>,
queue_manager: QueueManager,
reaper: lookup::gc::ActorRefReaper,
notify_ready: Option<KPromise<()>>,
encode_buffer: EncodeBuffer,
retry_map: FxHashMap<SocketAddr, u8>,
garbage_buffers: VecDeque<BufferChunk>,
}
impl NetworkDispatcher {
pub fn new(notify_ready: KPromise<()>) -> Self {
let config = NetworkConfig::default();
NetworkDispatcher::with_config(config, notify_ready)
}
pub fn with_config(cfg: NetworkConfig, notify_ready: KPromise<()>) -> Self {
let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new()));
let reaper = lookup::gc::ActorRefReaper::default();
let encode_buffer = crate::net::buffers::EncodeBuffer::with_config(
&cfg.buffer_config,
&cfg.custom_allocator,
);
NetworkDispatcher {
ctx: ComponentContext::uninitialised(),
connections: Default::default(),
cfg,
lookup,
net_bridge: None,
system_path: None,
queue_manager: QueueManager::new(),
reaper,
notify_ready: Some(notify_ready),
encode_buffer,
garbage_buffers: VecDeque::new(),
retry_map: Default::default(),
}
}
pub fn system_path_ref(&mut self) -> &SystemPath {
match self.system_path {
Some(ref path) => path,
None => {
let _ = self.system_path();
if let Some(ref path) = self.system_path {
path
} else {
unreachable!(
"Cached value should have been filled by calling self.system_path()!"
);
}
}
}
}
fn start(&mut self) -> Result<(), net::NetworkBridgeErr> {
debug!(self.ctx.log(), "Starting self and network bridge");
let dispatcher = self
.actor_ref()
.hold()
.expect("Self can hardly be deallocated!");
let bridge_logger = self.ctx.log().new(o!("owner" => "Bridge"));
let network_thread_logger = self.ctx.log().new(o!("owner" => "NetworkThread"));
let (mut bridge, _addr) = net::Bridge::new(
self.lookup.clone(),
network_thread_logger,
bridge_logger,
self.cfg.addr,
dispatcher.clone(),
&self.cfg,
);
let deadletter: DynActorRef = self.ctx.system().deadletter_ref().dyn_ref();
self.lookup.rcu(|current| {
let mut next = ActorStore::clone(¤t);
next.insert(PathResolvable::System, deadletter.clone())
.expect("Deadletter shouldn't error");
next
});
bridge.set_dispatcher(dispatcher);
self.schedule_retries();
self.net_bridge = Some(bridge);
Ok(())
}
fn stop(&mut self) -> () {
self.do_stop(false)
}
fn kill(&mut self) -> () {
self.do_stop(true)
}
fn do_stop(&mut self, _cleanup: bool) -> () {
if let Some(bridge) = self.net_bridge.take() {
if let Err(e) = bridge.stop() {
error!(
self.ctx().log(),
"NetworkBridge did not shut down as expected! Error was:\n {:?}\n", e
);
}
}
}
fn schedule_reaper(&mut self) {
if !self.reaper.is_scheduled() {
self.reaper.schedule();
} else {
let num_reaped = self.reaper.run(&self.lookup);
if num_reaped == 0 {
self.reaper.strategy_mut().incr();
} else {
self.reaper.strategy_mut().decr();
}
}
let next_wakeup = self.reaper.strategy().curr();
debug!(
self.ctx().log(),
"Scheduling reaping at {:?}ms", next_wakeup
);
let mut retry_queue = VecDeque::new();
for mut trash in self.garbage_buffers.drain(..) {
if !trash.free() {
retry_queue.push_back(trash);
}
}
self.garbage_buffers.append(&mut retry_queue);
self.schedule_once(Duration::from_millis(next_wakeup), move |target, _id| {
target.schedule_reaper();
Handled::Ok
});
}
fn schedule_retries(&mut self) {
let drain = self.retry_map.clone();
self.retry_map.clear();
for (addr, retry) in drain {
if retry < self.cfg.max_connection_retry_attempts {
self.retry_map.insert(addr, retry + 1);
if let Some(bridge) = &self.net_bridge {
debug!(
self.ctx().log(),
"Dispatcher retrying connection to host {}, attempt {}/{}",
addr,
retry,
self.cfg.max_connection_retry_attempts
);
bridge.connect(Transport::TCP, addr).unwrap();
}
} else {
info!(
self.ctx().log(),
"Dispatcher giving up on remote host {}, dropping queues", addr
);
self.queue_manager.drop_queue(&addr);
self.connections.remove(&addr);
}
}
self.schedule_once(
Duration::from_millis(self.cfg.connection_retry_interval),
move |target, _id| {
target.schedule_retries();
Handled::Ok
},
);
}
fn on_event(&mut self, ev: EventEnvelope) {
match ev {
EventEnvelope::Network(ev) => match ev {
NetworkEvent::Connection(addr, conn_state) => {
if let Err(e) = self.on_conn_state(addr, conn_state) {
error!(
self.ctx().log(),
"Error while connecting to {}, \n{:?}", addr, e
)
}
}
NetworkEvent::Data(_) => {
debug!(self.ctx().log(), "Received important data!");
}
NetworkEvent::RejectedFrame(addr, frame) => {
self.queue_manager.enqueue_priority_frame(frame, addr);
}
},
}
}
fn on_conn_state(
&mut self,
addr: SocketAddr,
mut state: ConnectionState,
) -> Result<(), NetworkBridgeErr> {
use self::ConnectionState::*;
match state {
Connected(ref mut _frame_sender) => {
info!(
self.ctx().log(),
"registering newly connected conn at {:?}", addr
);
let _ = self.retry_map.remove(&addr);
if self.queue_manager.has_frame(&addr) {
while let Some(frame) = self.queue_manager.pop_frame(&addr) {
if let Some(bridge) = &self.net_bridge {
bridge.route(addr, frame, net::Protocol::TCP)?;
}
}
}
}
Closed => {
if self.retry_map.get(&addr).is_none() {
warn!(self.ctx().log(), "connection closed for {:?}", addr);
self.retry_map.insert(addr, 0);
}
if let Some(bridge) = &self.net_bridge {
bridge.ack_closed(addr)?;
}
}
Error(ref err) => {
match err {
x if x.kind() == ErrorKind::ConnectionRefused => {
error!(self.ctx().log(), "connection refused for {:?}", addr);
}
why => {
error!(
self.ctx().log(),
"connection error for {:?}: {:?}", addr, why
);
}
}
}
ref _other => (),
}
self.connections.insert(addr, state);
Ok(())
}
fn route_local<R>(&mut self, msg: R) -> Result<(), NetworkBridgeErr>
where
R: Routable,
{
let lookup = self.lookup.load();
let lookup_result = lookup.get_by_actor_path(msg.destination());
match msg.into_local() {
Ok(netmsg) => match lookup_result {
LookupResult::Ref(actor) => {
actor.enqueue(netmsg);
Ok(())
}
LookupResult::Group(group) => {
group.route(netmsg, self.log());
Ok(())
}
LookupResult::None => {
error!(
self.ctx.log(),
"No local actor found at {:?}. Forwarding to DeadletterBox",
netmsg.receiver,
);
self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
Ok(())
}
LookupResult::Err(e) => {
error!(
self.ctx.log(),
"An error occurred during local actor lookup at {:?}. Forwarding to DeadletterBox. The error was: {}",
netmsg.receiver,
e
);
self.ctx.deadletter_ref().enqueue(MsgEnvelope::Net(netmsg));
Ok(())
}
},
Err(e) => {
error!(self.log(), "Could not serialise msg: {:?}. Dropping...", e);
Ok(())
}
}
}
fn route_remote<R>(&mut self, msg: R) -> Result<(), NetworkBridgeErr>
where
R: Routable,
{
let dst = msg.destination();
let protocol: Transport = dst.protocol();
let addr = SocketAddr::new(*dst.address(), dst.port());
let serialised = {
let buf = &mut self.encode_buffer.get_buffer_encoder();
msg.into_serialised(buf)?
};
match protocol {
Transport::TCP => self.route_remote_tcp(addr, serialised),
Transport::UDP => self.route_remote_udp(addr, serialised),
x => unimplemented!("Unsupported protocol: {}", x),
}
}
fn route_remote_udp(
&mut self,
addr: SocketAddr,
serialised: SerialisedFrame,
) -> Result<(), NetworkBridgeErr> {
if let Some(bridge) = &self.net_bridge {
bridge.route(addr, serialised, net::Protocol::UDP)?;
} else {
warn!(
self.ctx.log(),
"Dropping UDP message to {}, as bridge is not connected.", addr
);
}
Ok(())
}
fn route_remote_tcp(
&mut self,
addr: SocketAddr,
serialised: SerialisedFrame,
) -> Result<(), NetworkBridgeErr> {
let state: &mut ConnectionState =
self.connections.entry(addr).or_insert(ConnectionState::New);
let next: Option<ConnectionState> = match *state {
ConnectionState::New => {
debug!(
self.ctx.log(),
"No connection found; establishing and queuing frame"
);
self.queue_manager.enqueue_frame(serialised, addr);
if let Some(ref mut bridge) = self.net_bridge {
debug!(self.ctx.log(), "Establishing new connection to {:?}", addr);
self.retry_map.insert(addr, 0);
bridge.connect(Transport::TCP, addr).unwrap();
Some(ConnectionState::Initializing)
} else {
error!(self.ctx.log(), "No network bridge found; dropping message");
Some(ConnectionState::Closed)
}
}
ConnectionState::Connected(_) => {
if self.queue_manager.has_frame(&addr) {
self.queue_manager.enqueue_frame(serialised, addr);
if let Some(bridge) = &self.net_bridge {
while let Some(frame) = self.queue_manager.pop_frame(&addr) {
bridge.route(addr, frame, net::Protocol::TCP)?;
}
}
None
} else {
if let Some(bridge) = &self.net_bridge {
bridge.route(addr, serialised, net::Protocol::TCP)?;
}
None
}
}
ConnectionState::Initializing => {
self.queue_manager.enqueue_frame(serialised, addr);
None
}
ConnectionState::Closed => {
self.queue_manager.enqueue_frame(serialised, addr);
None
}
_ => None,
};
if let Some(next) = next {
*state = next;
}
Ok(())
}
fn resolve_path(&mut self, resolvable: &PathResolvable) -> Result<ActorPath, PathParseError> {
match resolvable {
PathResolvable::Path(actor_path) => Ok(actor_path.clone()),
PathResolvable::Alias(alias) => self
.system_path()
.into_named_with_string(alias)
.map(|p| p.into()),
PathResolvable::Segments(segments) => self
.system_path()
.into_named_with_vec(segments.to_vec())
.map(|p| p.into()),
PathResolvable::ActorId(id) => Ok(self.system_path().into_unique(*id).into()),
PathResolvable::System => Ok(self.deadletter_path()),
}
}
fn route<R>(&mut self, msg: R) -> Result<(), NetworkBridgeErr>
where
R: Routable,
{
if self.system_path_ref() == msg.destination().system() {
self.route_local(msg)
} else {
let proto = msg.destination().system().protocol();
match proto {
Transport::LOCAL => self.route_local(msg),
Transport::TCP => self.route_remote(msg),
Transport::UDP => self.route_remote(msg),
}
}
}
fn deadletter_path(&mut self) -> ActorPath {
ActorPath::Named(NamedPath::with_system(self.system_path(), Vec::new()))
}
fn register_actor(
&mut self,
registration: ActorRegistration,
update: bool,
promise: RegistrationPromise,
) {
let ActorRegistration { actor, path } = registration;
let res = self
.resolve_path(&path)
.map_err(RegistrationError::InvalidPath)
.and_then(|ap| {
let lease = self.lookup.load();
if lease.contains(&path) && !update {
warn!(
self.ctx.log(),
"Detected duplicate path during registration. The path will not be re-registered"
);
drop(lease);
Err(RegistrationError::DuplicateEntry)
} else {
drop(lease);
let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
self.lookup.rcu(|current| {
let mut next = ActorStore::clone(¤t);
result = next.insert(path.clone(), actor.clone());
next
});
if let Ok(ref res) = result {
if !res.is_empty() {
info!(self.ctx.log(), "Replaced entry for path={:?}", path);
}
}
result.map(|_| ap)
.map_err(RegistrationError::InvalidPath)
}
});
if res.is_ok() && !self.reaper.is_scheduled() {
self.schedule_reaper();
}
match promise {
RegistrationPromise::Fulfil(promise) => {
promise.fulfil(res).unwrap_or_else(|e| {
error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
});
}
RegistrationPromise::None => (),
}
}
fn register_policy(
&mut self,
registration: PolicyRegistration,
update: bool,
promise: RegistrationPromise,
) {
let PolicyRegistration { policy, path } = registration;
let lease = self.lookup.load();
let path_res = PathResolvable::Segments(path);
let res = self
.resolve_path(&path_res)
.map_err(RegistrationError::InvalidPath)
.and_then(|ap| {
if lease.contains(&path_res) && !update {
warn!(
self.ctx.log(),
"Detected duplicate path during registration. The path will not be re-registered",
);
drop(lease);
Err(RegistrationError::DuplicateEntry)
} else {
drop(lease);
let_irrefutable!(path, PathResolvable::Segments(path) = path_res);
let mut result: Result<InsertResult, PathParseError> = Ok(InsertResult::None);
self.lookup.rcu(|current| {
let mut next = ActorStore::clone(¤t);
result = next.set_routing_policy(&path, policy.clone());
next
});
if let Ok(ref res) = result {
if !res.is_empty() {
info!(self.ctx.log(), "Replaced entry for path={:?}", path);
}
}
result.map(|_| ap).map_err(RegistrationError::InvalidPath)
}
});
match promise {
RegistrationPromise::Fulfil(promise) => {
promise.fulfil(res).unwrap_or_else(|e| {
error!(self.ctx.log(), "Could not notify listeners: {:?}", e)
});
}
RegistrationPromise::None => (),
}
}
}
impl Actor for NetworkDispatcher {
type Message = DispatchEnvelope;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
match msg {
DispatchEnvelope::Msg { src, dst, msg } => {
if let Err(e) = self.route((src, dst, msg)) {
error!(self.ctx.log(), "Failed to route message: {:?}", e);
};
}
DispatchEnvelope::ForwardedMsg { msg } => {
if let Err(e) = self.route(msg) {
error!(self.ctx.log(), "Failed to route message: {:?}", e);
};
}
DispatchEnvelope::Registration(reg) => {
let RegistrationEnvelope {
event,
update,
promise,
} = reg;
match event {
RegistrationEvent::Actor(rea) => self.register_actor(rea, update, promise),
RegistrationEvent::Policy(rep) => self.register_policy(rep, update, promise),
}
}
DispatchEnvelope::Event(ev) => self.on_event(ev),
DispatchEnvelope::LockedChunk(trash) => self.garbage_buffers.push_back(trash),
}
Handled::Ok
}
fn receive_network(&mut self, msg: NetMessage) -> Handled {
warn!(self.ctx.log(), "Received network message: {:?}", msg,);
Handled::Ok
}
}
impl Dispatcher for NetworkDispatcher {
fn system_path(&mut self) -> SystemPath {
match self.system_path {
Some(ref path) => path.clone(),
None => {
let bound_addr = match self.net_bridge {
Some(ref net_bridge) => net_bridge.local_addr().clone().expect("If net bridge is ready, port should be as well!"),
None => panic!("You must wait until the socket is bound before attempting to create a system path!"),
};
let sp = SystemPath::new(self.cfg.transport, bound_addr.ip(), bound_addr.port());
self.system_path = Some(sp.clone());
sp
}
}
}
}
impl ComponentLifecycle for NetworkDispatcher {
fn on_start(&mut self) -> Handled {
info!(self.ctx.log(), "Starting network...");
let res = self.start();
match res {
Ok(_) => {
info!(self.ctx.log(), "Started network just fine.");
if let Some(promise) = self.notify_ready.take() {
promise.fulfil(()).unwrap_or_else(|e| {
error!(self.ctx.log(), "Could not start network! {:?}", e)
})
}
}
Err(e) => {
error!(self.ctx.log(), "Could not start network! {:?}", e);
panic!("Kill me now!");
}
}
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
info!(self.ctx.log(), "Stopping network...");
self.stop();
info!(self.ctx.log(), "Stopped network.");
Handled::Ok
}
fn on_kill(&mut self) -> Handled {
info!(self.ctx.log(), "Killing network...");
self.kill();
info!(self.ctx.log(), "Killed network.");
Handled::Ok
}
}
impl futures::sink::Sink<DispatchEnvelope> for ActorRefStrong<DispatchEnvelope> {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: DispatchEnvelope) -> Result<(), Self::Error> {
self.tell(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
impl futures::sink::Sink<NetMessage> for DynActorRef {
type Error = ();
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: NetMessage) -> Result<(), Self::Error> {
DynActorRef::enqueue(&self.as_ref(), item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
trait Routable {
fn source(&self) -> &ActorPath;
fn destination(&self) -> &ActorPath;
fn into_serialised(self, buf: &mut BufferEncoder) -> Result<SerialisedFrame, SerError>;
fn into_local(self) -> Result<NetMessage, SerError>;
}
impl Routable for NetMessage {
fn source(&self) -> &ActorPath {
&self.sender
}
fn destination(&self) -> &ActorPath {
&self.receiver
}
fn into_serialised(self, buf: &mut BufferEncoder) -> Result<SerialisedFrame, SerError> {
crate::ser_helpers::embed_msg(self, buf).map(SerialisedFrame::ChunkLease)
}
fn into_local(self) -> Result<NetMessage, SerError> {
Ok(self)
}
}
impl Routable for (ActorPath, ActorPath, DispatchData) {
fn source(&self) -> &ActorPath {
&self.0
}
fn destination(&self) -> &ActorPath {
&self.1
}
fn into_serialised(self, buf: &mut BufferEncoder) -> Result<SerialisedFrame, SerError> {
self.2.into_serialised(self.0, self.1, buf)
}
fn into_local(self) -> Result<NetMessage, SerError> {
self.2.into_local(self.0, self.1)
}
}
#[cfg(test)]
mod tests {
use super::{super::*, *};
use crate::prelude_test::net_test_helpers::{PingerAct, PongerAct};
use std::{thread, time::Duration};
#[test]
#[ignore]
#[should_panic(expected = "KompactSystem: Poisoned")]
fn failed_network() {
let mut cfg = KompactConfig::new();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new("127.0.0.1:80".parse().expect("Address should work"));
net_config.build()
});
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
thread::sleep(Duration::from_secs(1));
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
}
#[test]
fn network_cleanup() {
let mut cfg = KompactConfig::new();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
net_config.build()
});
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
let port = system.system_path().port();
println!("Got port: {}", port);
println!("Shutting down first system...");
system
.shutdown()
.expect("KompactSystem failed to shut down!");
println!("System shut down.");
let mut cfg2 = KompactConfig::new();
println!("Configuring network");
cfg2.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
net_config.build()
});
println!("Starting 2nd KompactSystem");
let system2 = cfg2.build().expect("KompactSystem");
thread::sleep(Duration::from_millis(100));
println!("2nd KompactSystem started just fine.");
let named_path2 = ActorPath::Named(NamedPath::with_system(
system2.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
assert_eq!(named_path, named_path2);
system2
.shutdown()
.expect("2nd KompactSystem failed to shut down!");
}
#[test]
fn network_cleanup_with_timeout() {
let mut cfg = KompactConfig::new();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work"));
net_config.build()
});
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
let port = system.system_path().port();
println!("Got port: {}", port);
thread::Builder::new()
.name("System1 Killer".to_string())
.spawn(move || {
thread::sleep(Duration::from_millis(100));
println!("Shutting down first system...");
system
.shutdown()
.expect("KompactSystem failed to shut down!");
println!("System shut down.");
})
.ok();
let mut cfg2 = KompactConfig::new();
println!("Configuring network");
cfg2.system_components(DeadletterBox::new, {
let net_config =
NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port));
net_config.build()
});
println!("Starting 2nd KompactSystem");
let system2 = cfg2.build().expect("KompactSystem");
thread::sleep(Duration::from_millis(100));
println!("2nd KompactSystem started just fine.");
let named_path2 = ActorPath::Named(NamedPath::with_system(
system2.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
assert_eq!(named_path, named_path2);
system2
.shutdown()
.expect("2nd KompactSystem failed to shut down!");
}
#[test]
fn test_system_path_timing() {
let mut cfg = KompactConfig::new();
println!("Configuring network");
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
println!("Starting KompactSystem");
let system = cfg.build().expect("KompactSystem");
println!("KompactSystem started just fine.");
let named_path = ActorPath::Named(NamedPath::with_system(
system.system_path(),
vec!["test".into()],
));
println!("Got path: {}", named_path);
}
#[test]
fn cleanup_bufferchunks_from_dead_actors() {
let system1 = || {
let mut cfg = KompactConfig::new();
cfg.system_components(
DeadletterBox::new,
NetworkConfig::new("127.0.0.1:0".parse().expect("Address should work")).build(),
);
cfg.build().expect("KompactSystem")
};
let system2 = |port| {
let mut cfg = KompactConfig::new();
cfg.system_components(
DeadletterBox::new,
NetworkConfig::new(SocketAddr::new("127.0.0.1".parse().unwrap(), port)).build(),
);
cfg.build().expect("KompactSystem")
};
let system2a = system2(0);
let port = system2a.system_path().port();
let (ponger_named, ponf) = system2a.create_and_register(PongerAct::new_lazy);
let poaf = system2a.register_by_alias(&ponger_named, "custom_name");
ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
let named_path = ActorPath::Named(NamedPath::with_system(
system2a.system_path(),
vec!["custom_name".into()],
));
let named_path_clone = named_path.clone();
let system1: KompactSystem = system1();
let (pinger_named, pinf) =
system1.create_and_register(move || PingerAct::new_eager(named_path_clone));
pinf.wait_expect(Duration::from_millis(1000), "Pinger failed to register!");
system2a.shutdown().ok();
system1.start(&pinger_named);
thread::sleep(Duration::from_millis(100));
system1.kill(pinger_named);
thread::sleep(Duration::from_millis(5000));
let mut garbage_len = 0;
let sc: &dyn SystemComponents = system1.get_system_components();
match sc.downcast::<CustomComponents<DeadletterBox, NetworkDispatcher>>() {
Some(cc) => {
garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len());
}
_ => {}
}
assert_ne!(0, garbage_len);
println!("Setting up system2b");
let system2b = system2(port);
let (ponger_named, ponf) = system2b.create_and_register(PongerAct::new_lazy);
let poaf = system2b.register_by_alias(&ponger_named, "custom_name");
ponf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
poaf.wait_expect(Duration::from_millis(1000), "Ponger failed to register!");
println!("Starting actor on system2b");
system2b.start(&ponger_named);
thread::sleep(Duration::from_millis(10000));
match sc.downcast::<CustomComponents<DeadletterBox, NetworkDispatcher>>() {
Some(cc) => {
garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len());
}
_ => {}
}
assert_eq!(0, garbage_len);
system1
.shutdown()
.expect("Kompact didn't shut down properly");
system2b
.shutdown()
.expect("Kompact didn't shut down properly");
}
}