actix_telepathy/network/
mod.rs

1mod resolver;
2mod writer;
3
4use actix::prelude::*;
5use log::*;
6use std::io::Error;
7use std::net::SocketAddr;
8use tokio::net::TcpStream;
9
10use crate::cluster::{Cluster, NodeEvent};
11use crate::codec::{ClusterMessage, ConnectCodec};
12use crate::network::resolver::{Connect, Resolver};
13use crate::network::writer::Writer;
14use crate::remote::{AddrRepresentation, AddrResolver, RemoteWrapper};
15use crate::Node;
16use crate::{ConnectionApproval, ConnectionApprovalResponse, Connector, CustomSystemService};
17use actix::io::WriteHandler;
18use std::fmt;
19use std::thread::sleep;
20use tokio::time::Duration;
21use tokio_util::codec::FramedRead;
22
23pub struct NetworkInterface {
24    own_ip: SocketAddr,
25    pub addr: SocketAddr,
26    stream: Vec<TcpStream>,
27    connected: bool,
28    own_addr: Option<Addr<NetworkInterface>>,
29    counter: i8,
30    seed: bool,
31    writer: Option<Addr<Writer>>,
32}
33
34impl Actor for NetworkInterface {
35    type Context = Context<Self>;
36
37    fn started(&mut self, ctx: &mut Context<Self>) {
38        debug!(target: &self.own_ip.to_string(), "NetworkInterface started! {}", self.addr);
39        self.own_addr = Some(ctx.address());
40        self.counter = 0;
41        if self.stream.is_empty() {
42            self.connect_to_stream(ctx);
43        } else {
44            self.frame_stream(ctx);
45        }
46    }
47
48    fn stopping(&mut self, ctx: &mut Context<Self>) -> Running {
49        warn!(target: &self.own_ip.to_string(), "NetworkInterface stopping! {}, counter: {}", self.addr, self.counter);
50        if self.counter < 2 {
51            self.stream = vec![];
52            self.connect_to_stream(ctx);
53            return Running::Continue;
54        }
55
56        Cluster::from_custom_registry().do_send(NodeEvent::MemberDown(self.addr));
57        Running::Stop
58    }
59
60    fn stopped(&mut self, _ctx: &mut Context<Self>) {
61        debug!("NetworkInterface stopped! {}", self.addr);
62    }
63}
64
65impl NetworkInterface {
66    pub fn new(own_ip: SocketAddr, addr: SocketAddr, seed: bool) -> NetworkInterface {
67        NetworkInterface {
68            own_ip,
69            addr,
70            stream: vec![],
71            connected: false,
72            own_addr: None,
73            counter: 0,
74            seed,
75            writer: None,
76        }
77    }
78
79    pub fn from_stream(
80        own_ip: SocketAddr,
81        addr: SocketAddr,
82        stream: TcpStream,
83    ) -> NetworkInterface {
84        let mut ni = Self::new(own_ip, addr, false);
85        ni.stream.push(stream);
86        ni
87    }
88
89    fn frame_stream(&mut self, ctx: &mut Context<Self>) {
90        let stream = self.stream.pop().unwrap();
91        let (r, w) = stream.into_split();
92
93        let framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
94        self.writer = Some(Writer::new(framed).start());
95        ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
96    }
97
98    fn connect_to_stream(&mut self, ctx: &mut Context<Self>) {
99        let addr = self.addr.clone().to_string();
100
101        Resolver::from_registry()
102            .send(Connect::host(addr))
103            .into_actor(self)
104            .map(|res, act, ctx| match res {
105                Ok(stream) => {
106                    if let Ok(stream) = stream {
107                        debug!(
108                            "Connected to network node: {}",
109                            act.addr.clone().to_string()
110                        );
111
112                        let (r, w) = stream.into_split();
113
114                        // configure write side of the connection
115                        let mut framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
116                        let reply_port = act.own_ip.port();
117                        framed.write(ClusterMessage::Request(reply_port, act.seed));
118                        act.writer = Some(Writer::new(framed).start());
119
120                        ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
121                    } else {
122                        debug!("Connection refused! Trying to reconnect!");
123                        act.counter += 1;
124                        sleep(Duration::from_secs(1));
125                        ctx.stop();
126                    }
127                }
128                Err(err) => {
129                    error!("{} | {}", err.to_string(), act.addr.to_string());
130                    act.counter += 1;
131                    sleep(Duration::from_secs(1));
132                    ctx.stop();
133                }
134            })
135            .wait(ctx);
136    }
137
138    fn finish_connecting(&mut self, self_is_seed: bool) {
139        self.connected = true;
140
141        match self.own_addr.clone() {
142            Some(addr) => {
143                debug!(target: &self.own_ip.to_string(), "finish connecting to {}", self.addr.to_string());
144                let node = Node::new(self.addr, Some(addr));
145                Cluster::from_custom_registry().do_send(NodeEvent::MemberUp(node, self_is_seed));
146            }
147            None => error!("NetworkInterface might not have been started already!"),
148        };
149    }
150
151    fn transmit_message(&mut self, msg: ClusterMessage) {
152        self.writer.as_ref().unwrap().do_send(msg);
153    }
154
155    fn received_message(&mut self, mut msg: RemoteWrapper) {
156        msg.source = self.own_addr.clone();
157        match msg.destination.id {
158            AddrRepresentation::NetworkInterface => {
159                panic!("NetworkInterface does not interact as RemoteActor")
160            }
161            AddrRepresentation::Connector => Connector::from_custom_registry().do_send(msg),
162            AddrRepresentation::Key(_) => AddrResolver::from_registry().do_send(msg),
163        }
164    }
165
166    fn set_reply_port(&mut self, port: u16, ctx: &mut Context<Self>, seed: bool) {
167        let send_addr = self.addr;
168        self.addr.set_port(port);
169        let addr = self.addr;
170
171        Cluster::from_custom_registry()
172            .send(ConnectionApproval { addr, send_addr })
173            .into_actor(self)
174            .map(move |res, act, ctx| {
175                if let Ok(message_response) = res {
176                    match message_response {
177                        ConnectionApprovalResponse::Approved => {
178                            act.transmit_message(ClusterMessage::Response);
179                            act.finish_connecting(seed)
180                        }
181                        ConnectionApprovalResponse::Declined => {
182                            act.transmit_message(ClusterMessage::Decline);
183                            ctx.stop()
184                        }
185                    }
186                }
187            })
188            .wait(ctx);
189    }
190}
191
192impl StreamHandler<Result<ClusterMessage, Error>> for NetworkInterface {
193    fn handle(&mut self, item: Result<ClusterMessage, Error>, ctx: &mut Context<Self>) {
194        match item {
195            Ok(msg) => match msg {
196                ClusterMessage::Request(reply_port, seed) => {
197                    self.set_reply_port(reply_port, ctx, seed)
198                }
199                ClusterMessage::Response => self.finish_connecting(false),
200                ClusterMessage::Message(remote_message) => self.received_message(remote_message),
201                ClusterMessage::Decline => ctx.stop(),
202            },
203            Err(err) => warn!("{}", err),
204        }
205    }
206}
207
208impl Handler<ClusterMessage> for NetworkInterface {
209    type Result = ();
210
211    fn handle(&mut self, msg: ClusterMessage, _ctx: &mut Context<Self>) -> Self::Result {
212        self.transmit_message(msg);
213    }
214}
215
216#[derive(Message)]
217#[rtype(result = "Result<(), MailboxError>")]
218pub struct WrappedClusterMessage(pub(crate) ClusterMessage);
219
220impl Handler<WrappedClusterMessage> for NetworkInterface {
221    type Result = ResponseFuture<Result<(), MailboxError>>;
222
223    fn handle(&mut self, msg: WrappedClusterMessage, _ctx: &mut Self::Context) -> Self::Result {
224        Box::pin(self.writer.as_ref().unwrap().send(msg.0))
225    }
226}
227
228impl WriteHandler<Error> for NetworkInterface {}
229impl Supervised for NetworkInterface {}
230
231impl fmt::Debug for NetworkInterface {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        write!(f, "NetworkInterface({})", self.addr)
234    }
235}