Skip to main content

actix_telepathy/network/
mod.rs

1mod resolver;
2mod writer;
3
4use actix::prelude::*;
5use log::*;
6use std::io::Error;
7use std::net::SocketAddr;
8use tokio::net::TcpStream;
9
10use crate::Node;
11use crate::cluster::{Cluster, NodeEvent};
12use crate::codec::{ClusterMessage, ConnectCodec};
13use crate::network::resolver::{Connect, Resolver};
14use crate::network::writer::Writer;
15use crate::remote::{AddrRepresentation, AddrResolver, RemoteWrapper};
16use crate::{ConnectionApproval, ConnectionApprovalResponse, Connector, CustomSystemService};
17use actix::io::WriteHandler;
18use std::fmt;
19use std::thread::sleep;
20use tokio::time::Duration;
21use tokio_util::codec::FramedRead;
22
23pub struct NetworkInterface {
24    own_ip: SocketAddr,
25    pub addr: SocketAddr,
26    stream: Vec<TcpStream>,
27    connected: bool,
28    own_addr: Option<Addr<NetworkInterface>>,
29    counter: i8,
30    seed: bool,
31    writer: Option<Addr<Writer>>,
32}
33
34impl Actor for NetworkInterface {
35    type Context = Context<Self>;
36
37    fn started(&mut self, ctx: &mut Context<Self>) {
38        debug!(target: &self.own_ip.to_string(), "NetworkInterface started! {}", self.addr);
39        self.own_addr = Some(ctx.address());
40        self.counter = 0;
41        if self.stream.is_empty() {
42            self.connect_to_stream(ctx);
43        } else {
44            self.frame_stream(ctx);
45        }
46    }
47
48    fn stopping(&mut self, ctx: &mut Context<Self>) -> Running {
49        warn!(target: &self.own_ip.to_string(), "NetworkInterface stopping! {}, counter: {}", self.addr, self.counter);
50        if self.counter < 2 {
51            self.stream = vec![];
52            self.connect_to_stream(ctx);
53            return Running::Continue;
54        }
55
56        Cluster::from_custom_registry().do_send(NodeEvent::MemberDown(self.addr));
57        Running::Stop
58    }
59
60    fn stopped(&mut self, _ctx: &mut Context<Self>) {
61        debug!("NetworkInterface stopped! {}", self.addr);
62    }
63}
64
65impl NetworkInterface {
66    pub fn new(own_ip: SocketAddr, addr: SocketAddr, seed: bool) -> NetworkInterface {
67        NetworkInterface {
68            own_ip,
69            addr,
70            stream: vec![],
71            connected: false,
72            own_addr: None,
73            counter: 0,
74            seed,
75            writer: None,
76        }
77    }
78
79    pub fn from_stream(
80        own_ip: SocketAddr,
81        addr: SocketAddr,
82        stream: TcpStream,
83    ) -> NetworkInterface {
84        let mut ni = Self::new(own_ip, addr, false);
85        ni.stream.push(stream);
86        ni
87    }
88
89    fn frame_stream(&mut self, ctx: &mut Context<Self>) {
90        let stream = self.stream.pop().unwrap();
91        let (r, w) = stream.into_split();
92
93        let framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
94        self.writer = Some(Writer::new(framed).start());
95        ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
96    }
97
98    fn connect_to_stream(&mut self, ctx: &mut Context<Self>) {
99        let addr = self.addr.clone().to_string();
100
101        Resolver::from_registry()
102            .send(Connect::host(addr))
103            .into_actor(self)
104            .map(|res, act, ctx| match res {
105                Ok(stream) => {
106                    if let Ok(stream) = stream {
107                        debug!("Connected to network node: {}", act.addr);
108
109                        let (r, w) = stream.into_split();
110
111                        // configure write side of the connection
112                        let mut framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
113                        let reply_port = act.own_ip.port();
114                        framed.write(ClusterMessage::Request(reply_port, act.seed));
115                        act.writer = Some(Writer::new(framed).start());
116
117                        ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
118                    } else {
119                        debug!("Connection refused! Trying to reconnect!");
120                        act.counter += 1;
121                        sleep(Duration::from_secs(1));
122                        ctx.stop();
123                    }
124                }
125                Err(err) => {
126                    error!("{} | {}", err, act.addr);
127                    act.counter += 1;
128                    sleep(Duration::from_secs(1));
129                    ctx.stop();
130                }
131            })
132            .wait(ctx);
133    }
134
135    fn finish_connecting(&mut self, self_is_seed: bool) {
136        self.connected = true;
137
138        match self.own_addr.clone() {
139            Some(addr) => {
140                debug!(target: &self.own_ip.to_string(), "finish connecting to {}", self.addr);
141                let node = Node::new(self.addr, Some(addr));
142                Cluster::from_custom_registry().do_send(NodeEvent::MemberUp(node, self_is_seed));
143            }
144            None => error!("NetworkInterface might not have been started already!"),
145        };
146    }
147
148    fn transmit_message(&mut self, msg: ClusterMessage) {
149        self.writer.as_ref().unwrap().do_send(msg);
150    }
151
152    fn received_message(&mut self, mut msg: RemoteWrapper) {
153        msg.source = self.own_addr.clone();
154        match msg.destination.id {
155            AddrRepresentation::NetworkInterface => {
156                panic!("NetworkInterface does not interact as RemoteActor")
157            }
158            AddrRepresentation::Connector => Connector::from_custom_registry().do_send(msg),
159            AddrRepresentation::Key(_) => AddrResolver::from_registry().do_send(msg),
160        }
161    }
162
163    fn set_reply_port(&mut self, port: u16, ctx: &mut Context<Self>, seed: bool) {
164        let send_addr = self.addr;
165        self.addr.set_port(port);
166        let addr = self.addr;
167
168        Cluster::from_custom_registry()
169            .send(ConnectionApproval { addr, send_addr })
170            .into_actor(self)
171            .map(move |res, act, ctx| {
172                if let Ok(message_response) = res {
173                    match message_response {
174                        ConnectionApprovalResponse::Approved => {
175                            act.transmit_message(ClusterMessage::Response);
176                            act.finish_connecting(seed)
177                        }
178                        ConnectionApprovalResponse::Declined => {
179                            act.transmit_message(ClusterMessage::Decline);
180                            ctx.stop()
181                        }
182                    }
183                }
184            })
185            .wait(ctx);
186    }
187}
188
189impl StreamHandler<Result<ClusterMessage, Error>> for NetworkInterface {
190    fn handle(&mut self, item: Result<ClusterMessage, Error>, ctx: &mut Context<Self>) {
191        match item {
192            Ok(msg) => match msg {
193                ClusterMessage::Request(reply_port, seed) => {
194                    self.set_reply_port(reply_port, ctx, seed)
195                }
196                ClusterMessage::Response => self.finish_connecting(false),
197                ClusterMessage::Message(remote_message) => self.received_message(remote_message),
198                ClusterMessage::Decline => ctx.stop(),
199            },
200            Err(err) => warn!("{}", err),
201        }
202    }
203}
204
205impl Handler<ClusterMessage> for NetworkInterface {
206    type Result = ();
207
208    fn handle(&mut self, msg: ClusterMessage, _ctx: &mut Context<Self>) -> Self::Result {
209        self.transmit_message(msg);
210    }
211}
212
213#[derive(Message)]
214#[rtype(result = "Result<(), MailboxError>")]
215pub struct WrappedClusterMessage(pub(crate) ClusterMessage);
216
217impl Handler<WrappedClusterMessage> for NetworkInterface {
218    type Result = ResponseFuture<Result<(), MailboxError>>;
219
220    fn handle(&mut self, msg: WrappedClusterMessage, _ctx: &mut Self::Context) -> Self::Result {
221        Box::pin(self.writer.as_ref().unwrap().send(msg.0))
222    }
223}
224
225impl WriteHandler<Error> for NetworkInterface {}
226impl Supervised for NetworkInterface {}
227
228impl fmt::Debug for NetworkInterface {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        write!(f, "NetworkInterface({})", self.addr)
231    }
232}