mod resolver;
mod writer;
use actix::prelude::*;
use log::*;
use std::io::Error;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use crate::cluster::{Cluster, Gossip, NodeEvents};
use crate::codec::{ClusterMessage, ConnectCodec};
use crate::network::resolver::{Connect, Resolver};
use crate::network::writer::Writer;
use crate::remote::{AddrRepresentation, AddrResolver, RemoteAddr, RemoteWrapper};
use crate::{ConnectionApproval, ConnectionApprovalResponse, CustomSystemService};
use actix::io::WriteHandler;
use std::fmt;
use std::thread::sleep;
use tokio::time::Duration;
use tokio_util::codec::FramedRead;
pub struct NetworkInterface {
own_ip: SocketAddr,
pub addr: SocketAddr,
stream: Vec<TcpStream>,
connected: bool,
own_addr: Option<Addr<NetworkInterface>>,
counter: i8,
seed: bool,
writer: Option<Addr<Writer>>,
}
impl Actor for NetworkInterface {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
debug!("NetworkInterface started! {}", self.addr);
self.own_addr = Some(ctx.address());
self.counter = 0;
if self.stream.is_empty() {
self.connect_to_stream(ctx);
} else {
self.frame_stream(ctx);
}
}
fn stopping(&mut self, ctx: &mut Context<Self>) -> Running {
if self.counter < 2 {
self.stream = vec![];
self.connect_to_stream(ctx);
return Running::Continue;
}
Cluster::from_custom_registry().do_send(NodeEvents::MemberDown(self.addr));
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
debug!("NetworkInterface stopped! {}", self.addr);
}
}
impl NetworkInterface {
pub fn new(own_ip: SocketAddr, addr: SocketAddr, seed: bool) -> NetworkInterface {
NetworkInterface {
own_ip,
addr,
stream: vec![],
connected: false,
own_addr: None,
counter: 0,
seed,
writer: None,
}
}
pub fn from_stream(
own_ip: SocketAddr,
addr: SocketAddr,
stream: TcpStream,
) -> NetworkInterface {
let mut ni = Self::new(own_ip, addr, false);
ni.stream.push(stream);
ni
}
fn frame_stream(&mut self, ctx: &mut Context<Self>) {
let stream = self.stream.pop().unwrap();
let (r, w) = stream.into_split();
let framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
self.writer = Some(Writer::new(framed).start());
ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
}
fn connect_to_stream(&mut self, ctx: &mut Context<Self>) {
let addr = self.addr.clone().to_string();
Resolver::from_registry()
.send(Connect::host(addr))
.into_actor(self)
.map(|res, act, ctx| match res {
Ok(stream) => {
if let Ok(stream) = stream {
debug!(
"Connected to network node: {}",
act.addr.clone().to_string()
);
let (r, w) = stream.into_split();
let mut framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
let reply_port = act.own_ip.port();
framed.write(ClusterMessage::Request(reply_port, act.seed));
act.writer = Some(Writer::new(framed).start());
ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
} else {
debug!("Connection refused! Trying to reconnect!");
act.counter += 1;
sleep(Duration::from_secs(1));
ctx.stop();
}
}
Err(err) => error!("{}", err.to_string()),
})
.wait(ctx);
}
fn finish_connecting(&mut self) {
self.connected = true;
match self.own_addr.clone() {
Some(addr) => {
debug!("finish connecting to {}", self.addr.to_string());
let remote_address = RemoteAddr::new(
self.addr,
Some(addr.clone()),
AddrRepresentation::NetworkInterface,
);
Cluster::from_custom_registry().do_send(NodeEvents::MemberUp(
self.addr,
addr,
remote_address,
self.seed,
));
}
None => error!("NetworkInterface might not have been started already!"),
};
}
fn transmit_message(&mut self, msg: ClusterMessage) {
self.writer.as_ref().unwrap().do_send(msg);
}
fn received_message(&mut self, mut msg: RemoteWrapper) {
msg.source = self.own_addr.clone();
match msg.destination.id {
AddrRepresentation::NetworkInterface => {
panic!("NetworkInterface does not interact as RemoteActor")
}
AddrRepresentation::Gossip => Gossip::from_custom_registry().do_send(msg),
AddrRepresentation::Key(_) => AddrResolver::from_registry().do_send(msg),
}
}
fn set_reply_port(&mut self, port: u16, ctx: &mut Context<Self>, seed: bool) {
let send_addr = self.addr;
self.addr.set_port(port);
let addr = self.addr;
self.seed = seed;
Cluster::from_custom_registry()
.send(ConnectionApproval { addr, send_addr })
.into_actor(self)
.map(|res, act, ctx| {
if let Ok(message_response) = res {
match message_response {
ConnectionApprovalResponse::Approved => {
act.transmit_message(ClusterMessage::Response);
act.finish_connecting()
}
ConnectionApprovalResponse::Declined => {
act.transmit_message(ClusterMessage::Decline);
ctx.stop()
}
}
}
})
.wait(ctx);
}
}
impl StreamHandler<Result<ClusterMessage, Error>> for NetworkInterface {
fn handle(&mut self, item: Result<ClusterMessage, Error>, ctx: &mut Context<Self>) {
match item {
Ok(msg) => match msg {
ClusterMessage::Request(reply_port, seed) => {
self.set_reply_port(reply_port, ctx, seed)
}
ClusterMessage::Response => self.finish_connecting(),
ClusterMessage::Message(remote_message) => self.received_message(remote_message),
ClusterMessage::Decline => ctx.stop(),
},
Err(err) => warn!("{}", err),
}
}
}
impl Handler<ClusterMessage> for NetworkInterface {
type Result = ();
fn handle(&mut self, msg: ClusterMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.transmit_message(msg);
}
}
#[derive(Message)]
#[rtype(result = "Result<(), MailboxError>")]
pub struct WrappedClusterMessage(pub(crate) ClusterMessage);
impl Handler<WrappedClusterMessage> for NetworkInterface {
type Result = ResponseFuture<Result<(), MailboxError>>;
fn handle(&mut self, msg: WrappedClusterMessage, _ctx: &mut Self::Context) -> Self::Result {
Box::pin(self.writer.as_ref().unwrap().send(msg.0))
}
}
impl WriteHandler<Error> for NetworkInterface {}
impl Supervised for NetworkInterface {}
impl fmt::Debug for NetworkInterface {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "NetworkInterface({})", self.addr)
}
}