1mod resolver;
2mod writer;
3
4use actix::prelude::*;
5use log::*;
6use std::io::Error;
7use std::net::SocketAddr;
8use tokio::net::TcpStream;
9
10use crate::cluster::{Cluster, NodeEvent};
11use crate::codec::{ClusterMessage, ConnectCodec};
12use crate::network::resolver::{Connect, Resolver};
13use crate::network::writer::Writer;
14use crate::remote::{AddrRepresentation, AddrResolver, RemoteWrapper};
15use crate::Node;
16use crate::{ConnectionApproval, ConnectionApprovalResponse, Connector, CustomSystemService};
17use actix::io::WriteHandler;
18use std::fmt;
19use std::thread::sleep;
20use tokio::time::Duration;
21use tokio_util::codec::FramedRead;
22
23pub struct NetworkInterface {
24 own_ip: SocketAddr,
25 pub addr: SocketAddr,
26 stream: Vec<TcpStream>,
27 connected: bool,
28 own_addr: Option<Addr<NetworkInterface>>,
29 counter: i8,
30 seed: bool,
31 writer: Option<Addr<Writer>>,
32}
33
34impl Actor for NetworkInterface {
35 type Context = Context<Self>;
36
37 fn started(&mut self, ctx: &mut Context<Self>) {
38 debug!(target: &self.own_ip.to_string(), "NetworkInterface started! {}", self.addr);
39 self.own_addr = Some(ctx.address());
40 self.counter = 0;
41 if self.stream.is_empty() {
42 self.connect_to_stream(ctx);
43 } else {
44 self.frame_stream(ctx);
45 }
46 }
47
48 fn stopping(&mut self, ctx: &mut Context<Self>) -> Running {
49 warn!(target: &self.own_ip.to_string(), "NetworkInterface stopping! {}, counter: {}", self.addr, self.counter);
50 if self.counter < 2 {
51 self.stream = vec![];
52 self.connect_to_stream(ctx);
53 return Running::Continue;
54 }
55
56 Cluster::from_custom_registry().do_send(NodeEvent::MemberDown(self.addr));
57 Running::Stop
58 }
59
60 fn stopped(&mut self, _ctx: &mut Context<Self>) {
61 debug!("NetworkInterface stopped! {}", self.addr);
62 }
63}
64
65impl NetworkInterface {
66 pub fn new(own_ip: SocketAddr, addr: SocketAddr, seed: bool) -> NetworkInterface {
67 NetworkInterface {
68 own_ip,
69 addr,
70 stream: vec![],
71 connected: false,
72 own_addr: None,
73 counter: 0,
74 seed,
75 writer: None,
76 }
77 }
78
79 pub fn from_stream(
80 own_ip: SocketAddr,
81 addr: SocketAddr,
82 stream: TcpStream,
83 ) -> NetworkInterface {
84 let mut ni = Self::new(own_ip, addr, false);
85 ni.stream.push(stream);
86 ni
87 }
88
89 fn frame_stream(&mut self, ctx: &mut Context<Self>) {
90 let stream = self.stream.pop().unwrap();
91 let (r, w) = stream.into_split();
92
93 let framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
94 self.writer = Some(Writer::new(framed).start());
95 ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
96 }
97
98 fn connect_to_stream(&mut self, ctx: &mut Context<Self>) {
99 let addr = self.addr.clone().to_string();
100
101 Resolver::from_registry()
102 .send(Connect::host(addr))
103 .into_actor(self)
104 .map(|res, act, ctx| match res {
105 Ok(stream) => {
106 if let Ok(stream) = stream {
107 debug!(
108 "Connected to network node: {}",
109 act.addr.clone().to_string()
110 );
111
112 let (r, w) = stream.into_split();
113
114 let mut framed = actix::io::FramedWrite::new(w, ConnectCodec::new(), ctx);
116 let reply_port = act.own_ip.port();
117 framed.write(ClusterMessage::Request(reply_port, act.seed));
118 act.writer = Some(Writer::new(framed).start());
119
120 ctx.add_stream(FramedRead::new(r, ConnectCodec::new()));
121 } else {
122 debug!("Connection refused! Trying to reconnect!");
123 act.counter += 1;
124 sleep(Duration::from_secs(1));
125 ctx.stop();
126 }
127 }
128 Err(err) => {
129 error!("{} | {}", err.to_string(), act.addr.to_string());
130 act.counter += 1;
131 sleep(Duration::from_secs(1));
132 ctx.stop();
133 }
134 })
135 .wait(ctx);
136 }
137
138 fn finish_connecting(&mut self, self_is_seed: bool) {
139 self.connected = true;
140
141 match self.own_addr.clone() {
142 Some(addr) => {
143 debug!(target: &self.own_ip.to_string(), "finish connecting to {}", self.addr.to_string());
144 let node = Node::new(self.addr, Some(addr));
145 Cluster::from_custom_registry().do_send(NodeEvent::MemberUp(node, self_is_seed));
146 }
147 None => error!("NetworkInterface might not have been started already!"),
148 };
149 }
150
151 fn transmit_message(&mut self, msg: ClusterMessage) {
152 self.writer.as_ref().unwrap().do_send(msg);
153 }
154
155 fn received_message(&mut self, mut msg: RemoteWrapper) {
156 msg.source = self.own_addr.clone();
157 match msg.destination.id {
158 AddrRepresentation::NetworkInterface => {
159 panic!("NetworkInterface does not interact as RemoteActor")
160 }
161 AddrRepresentation::Connector => Connector::from_custom_registry().do_send(msg),
162 AddrRepresentation::Key(_) => AddrResolver::from_registry().do_send(msg),
163 }
164 }
165
166 fn set_reply_port(&mut self, port: u16, ctx: &mut Context<Self>, seed: bool) {
167 let send_addr = self.addr;
168 self.addr.set_port(port);
169 let addr = self.addr;
170
171 Cluster::from_custom_registry()
172 .send(ConnectionApproval { addr, send_addr })
173 .into_actor(self)
174 .map(move |res, act, ctx| {
175 if let Ok(message_response) = res {
176 match message_response {
177 ConnectionApprovalResponse::Approved => {
178 act.transmit_message(ClusterMessage::Response);
179 act.finish_connecting(seed)
180 }
181 ConnectionApprovalResponse::Declined => {
182 act.transmit_message(ClusterMessage::Decline);
183 ctx.stop()
184 }
185 }
186 }
187 })
188 .wait(ctx);
189 }
190}
191
192impl StreamHandler<Result<ClusterMessage, Error>> for NetworkInterface {
193 fn handle(&mut self, item: Result<ClusterMessage, Error>, ctx: &mut Context<Self>) {
194 match item {
195 Ok(msg) => match msg {
196 ClusterMessage::Request(reply_port, seed) => {
197 self.set_reply_port(reply_port, ctx, seed)
198 }
199 ClusterMessage::Response => self.finish_connecting(false),
200 ClusterMessage::Message(remote_message) => self.received_message(remote_message),
201 ClusterMessage::Decline => ctx.stop(),
202 },
203 Err(err) => warn!("{}", err),
204 }
205 }
206}
207
208impl Handler<ClusterMessage> for NetworkInterface {
209 type Result = ();
210
211 fn handle(&mut self, msg: ClusterMessage, _ctx: &mut Context<Self>) -> Self::Result {
212 self.transmit_message(msg);
213 }
214}
215
216#[derive(Message)]
217#[rtype(result = "Result<(), MailboxError>")]
218pub struct WrappedClusterMessage(pub(crate) ClusterMessage);
219
220impl Handler<WrappedClusterMessage> for NetworkInterface {
221 type Result = ResponseFuture<Result<(), MailboxError>>;
222
223 fn handle(&mut self, msg: WrappedClusterMessage, _ctx: &mut Self::Context) -> Self::Result {
224 Box::pin(self.writer.as_ref().unwrap().send(msg.0))
225 }
226}
227
228impl WriteHandler<Error> for NetworkInterface {}
229impl Supervised for NetworkInterface {}
230
231impl fmt::Debug for NetworkInterface {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 write!(f, "NetworkInterface({})", self.addr)
234 }
235}