1use crate::{
2 connection::{Message, NodeAddr, NodeMap},
3 error::Error,
4 Result,
5};
6use message_io::{
7 events::EventQueue,
8 network::{Endpoint, NetEvent, Network, Transport},
9};
10use std::{
11 net::SocketAddr,
12 sync::{Arc, Mutex},
13 time::Duration,
14};
15
16pub struct Node {
18 pub connections: Arc<Mutex<NodeMap>>,
20 pub node_addr: SocketAddr,
22 pub duration: u32,
25 pub network: Arc<Mutex<Network>>,
27 pub event_queue: EventQueue<NetEvent>,
29 pub peer: Option<String>,
31}
32
33impl Node {
34 pub fn new(port: u32, duration: u32, peer: Option<String>) -> Result<Self> {
36 let (mut network, event_queue) = Network::split();
37
38 let listening_addr = format!("127.0.0.1:{}", port);
40 match network.listen(Transport::FramedTcp, &listening_addr) {
41 Ok((_, addr)) => {
42 log_my_address(&addr);
43
44 Ok(Self {
45 connections: Arc::new(Mutex::new(NodeMap::new(addr))),
46 node_addr: addr,
47 duration,
48 network: Arc::new(Mutex::new(network)),
49 event_queue,
50 peer,
51 })
52 }
53 Err(e) => Err(Error::NetworkListeningError(format!(
54 "{}: {}",
55 e, listening_addr
56 ))),
57 }
58 }
59
60 pub fn execute(mut self) -> Result<()> {
62 if let Some(addr) = &self.peer {
63 let mut network = self
64 .network
65 .lock()
66 .map_err(|e| Error::NetworkLockError(e.to_string()))?;
67
68 match network.connect(Transport::FramedTcp, addr) {
70 Ok((endpoint, _)) => {
71 {
72 let mut nodes = self
73 .connections
74 .lock()
75 .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
76 nodes.add_old_one(endpoint);
77 }
78
79 send_message(
80 &mut network,
81 endpoint,
82 &Message::RetrievePubAddr(self.node_addr),
83 )?;
84
85 send_message(&mut network, endpoint, &Message::RetrievePeerList)?;
88 }
89 Err(e) => {
90 return Err(Error::NetworkConnectionError(format!("{}: {}", e, &addr)));
91 }
92 }
93 }
94
95 self.spawn_emit_loop()?;
97
98 self.process_message()?;
100
101 Ok(())
102 }
103
104 fn spawn_emit_loop(&self) -> Result<()> {
105 let sleep_duration = Duration::from_secs(self.duration as u64);
106 let peers_mut = Arc::clone(&self.connections);
107 let network_mut = Arc::clone(&self.network);
108
109 std::thread::spawn(move || {
110 loop {
112 std::thread::sleep(sleep_duration);
113
114 let peers = peers_mut.lock().expect("Unable to obtain mutex on peers");
115 let receivers = peers.fetch_receivers();
116
117 if receivers.is_empty() {
119 continue;
120 }
121
122 let mut network = network_mut.lock().expect("Failed to lock network");
123
124 let msg_text = generate_random_message();
125 let msg = Message::RequestRandomInfo(msg_text.clone());
126
127 log_sending_message(
128 &msg_text,
129 &receivers
130 .iter()
131 .map(|NodeAddr { public, .. }| public)
132 .collect(),
133 );
134
135 for NodeAddr { endpoint, .. } in receivers {
136 send_message(&mut network, endpoint, &msg).expect("Failed to send message");
137 }
138 }
139 });
140 Ok(())
141 }
142
143 fn process_message(&mut self) -> Result<()> {
144 loop {
145 match self.event_queue.receive() {
146 NetEvent::Message(message_sender, input_data) => {
148 match bincode::deserialize(&input_data)? {
149 Message::RetrievePubAddr(pub_addr) => {
150 let mut peers = self
151 .connections
152 .lock()
153 .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
154 peers.add_new_one(message_sender, pub_addr);
155 }
156 Message::RetrievePeerList => {
157 let list = {
158 let peers = self
159 .connections
160 .lock()
161 .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
162 peers.get_peers_list()
163 };
164 let msg = Message::RespondToListQuery(list);
165 send_message(
166 &mut self.network.lock().expect("Error in sending message"),
167 message_sender,
168 &msg,
169 )?;
170 }
171 Message::RespondToListQuery(addrs) => {
172 let filtered: Vec<&SocketAddr> =
173 addrs.iter().filter(|x| *x != &self.node_addr).collect();
174
175 log_connected_to_the_peers(&filtered);
176
177 let mut network = self
178 .network
179 .lock()
180 .map_err(|e| Error::NetworkLockError(e.to_string()))?;
181
182 for peer in filtered {
183 if peer == &message_sender.addr() {
184 continue;
185 }
186
187 let (endpoint, _) = network
189 .connect(Transport::FramedTcp, *peer)
190 .map_err(|e| Error::NetworkConnectionError(e.to_string()))?;
191
192 let msg = Message::RetrievePubAddr(self.node_addr);
194 send_message(&mut network, endpoint, &msg)?;
195
196 self.connections
198 .lock()
199 .map_err(|e| Error::AddPeerError(e.to_string()))?
200 .add_old_one(endpoint);
201 }
202 }
203 Message::RequestRandomInfo(text) => {
204 let pub_addr = self
205 .connections
206 .lock()
207 .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?
208 .get_pub_addr(&message_sender)
209 .expect("Error in fetching public address");
210 log_message_received(&pub_addr, &text);
211 }
212 }
213 }
214 NetEvent::Connected(_, _) => {}
215 NetEvent::Disconnected(endpoint) => {
216 let mut peers = self
217 .connections
218 .lock()
219 .map_err(|e| Error::ConnectionsFetchError(e.to_string()))?;
220 NodeMap::drop(&mut peers, endpoint);
221 }
222 }
223 }
224 }
225}
226
227fn send_message(network: &mut Network, to: Endpoint, msg: &Message) -> Result<()> {
228 let output_data = bincode::serialize(msg)?;
229 network.send(to, &output_data);
230 Ok(())
231}
232
233fn generate_random_message() -> String {
234 petname::Petnames::default().generate_one(2, "-")
235}
236
237trait ToSocketAddr {
238 fn get_addr(&self) -> SocketAddr;
239}
240
241impl ToSocketAddr for Endpoint {
242 fn get_addr(&self) -> SocketAddr {
243 self.addr()
244 }
245}
246
247impl ToSocketAddr for &Endpoint {
248 fn get_addr(&self) -> SocketAddr {
249 self.addr()
250 }
251}
252
253impl ToSocketAddr for SocketAddr {
254 fn get_addr(&self) -> SocketAddr {
255 *self
256 }
257}
258
259impl ToSocketAddr for &SocketAddr {
260 fn get_addr(&self) -> SocketAddr {
261 **self
262 }
263}
264
265fn format_list_of_addrs<T: ToSocketAddr>(items: &Vec<T>) -> String {
266 if items.is_empty() {
267 "[no one]".to_owned()
268 } else {
269 let joined = items
270 .iter()
271 .map(|x| format!("\"{}\"", ToSocketAddr::get_addr(x)))
272 .collect::<Vec<String>>()
273 .join(", ");
274
275 format!("[{}]", joined)
276 }
277}
278
279fn log_message_received<T: ToSocketAddr>(from: &T, text: &str) {
280 log::info!(
281 "Received message [{}] from \"{}\"",
282 text,
283 ToSocketAddr::get_addr(from)
284 );
285}
286
287fn log_my_address<T: ToSocketAddr>(addr: &T) {
288 log::info!("My address is \"{}\"", ToSocketAddr::get_addr(addr));
289}
290
291fn log_connected_to_the_peers<T: ToSocketAddr>(peers: &Vec<T>) {
292 log::info!("Connected to the peers at {}", format_list_of_addrs(peers));
293}
294
295fn log_sending_message<T: ToSocketAddr>(message: &str, receivers: &Vec<T>) {
296 log::info!(
297 "Sending message [{}] to {}",
298 message,
299 format_list_of_addrs(receivers)
300 );
301}